1use aws_config::BehaviorVersion;
2use aws_sdk_lambda::{
3 types::{Environment, Architecture, Runtime, FunctionCode},
4 primitives::Blob,
5 Client as LambdaClient,
6};
7use aws_sdk_iam::Client as IamClient;
8use aws_sdk_cloudwatch::Client as CloudWatchClient;
9use aws_sdk_eventbridge::Client as EventBridgeClient;
10use aws_sdk_cloudtrail::Client as CloudTrailClient;
11use aws_sdk_s3::Client as S3Client;
12use aws_sdk_sns::Client as SnsClient;
13use aws_types::SdkConfig;
14use std::collections::HashMap;
15use std::time::SystemTime;
16use anyhow::{Result, anyhow};
17use tracing::{info, error, warn};
18use aws_credential_types::Credentials;
19use aws_types::region::Region;
20
21async fn create_aws_config_from_env(aws_region: &str) -> Result<SdkConfig> {
23 dotenv::dotenv().ok();
25
26 let access_key_id = std::env::var("AWS_ACCESS_KEY_ID")
28 .map_err(|_| anyhow!("❌ 错误: AWS_ACCESS_KEY_ID 必须在.env文件中设置"))?;
29 let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY")
30 .map_err(|_| anyhow!("❌ 错误: AWS_SECRET_ACCESS_KEY 必须在.env文件中设置"))?;
31
32 let credentials = Credentials::new(access_key_id, secret_access_key, None, None, "env-file");
34
35 let config = aws_config::defaults(BehaviorVersion::latest())
37 .region(Region::new(aws_region.to_string()))
38 .credentials_provider(credentials)
39 .load()
40 .await;
41
42 validate_credentials(&config).await?;
44
45 info!("✅ AWS凭证已成功从.env文件加载");
46 Ok(config)
47}
48
49async fn validate_credentials(config: &SdkConfig) -> Result<()> {
51 use aws_sdk_sts::Client as StsClient;
52
53 let sts_client = StsClient::new(config);
54
55 match sts_client.get_caller_identity().send().await {
56 Ok(response) => {
57 if let Some(account_id) = response.account() {
58 info!("✅ AWS凭证验证成功 - 账户ID: {}", account_id);
59 }
60 Ok(())
61 }
62 Err(e) => {
63 error!("❌ AWS凭证验证失败: {}", e);
64 Err(anyhow!("AWS凭证无效,请检查.env文件中的凭证配置"))
65 }
66 }
67}
68
69#[derive(Debug)]
70pub struct AwsManager {
71 lambda_client: LambdaClient,
72 iam_client: IamClient,
73 cloudwatch_client: CloudWatchClient,
74 eventbridge_client: EventBridgeClient,
75 cloudtrail_client: CloudTrailClient,
76 s3_client: S3Client,
77 sns_client: SnsClient,
78 region: String,
79 sdk_config: SdkConfig,
80}
81
82impl AwsManager {
83 pub async fn new(aws_region: &str) -> Result<Self> {
84 let config = create_aws_config_from_env(aws_region).await?;
86
87 let lambda_client = LambdaClient::new(&config);
88 let iam_client = IamClient::new(&config);
89 let cloudwatch_client = CloudWatchClient::new(&config);
90 let eventbridge_client = EventBridgeClient::new(&config);
91 let cloudtrail_client = CloudTrailClient::new(&config);
92 let s3_client = S3Client::new(&config);
93 let sns_client = SnsClient::new(&config);
94
95 Ok(AwsManager {
96 lambda_client,
97 iam_client,
98 cloudwatch_client,
99 eventbridge_client,
100 cloudtrail_client,
101 s3_client,
102 sns_client,
103 region: aws_region.to_string(),
104 sdk_config: config,
105 })
106 }
107
108 pub async fn create_lambda_execution_role(&self) -> Result<String> {
109 let role_name = format!("lambda-bedrock-monitor-role-{}",
110 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_secs());
111 self.create_lambda_execution_role_with_name(&role_name).await
112 }
113
114 pub async fn create_lambda_execution_role_with_name(&self, role_name: &str) -> Result<String> {
115 let trust_policy = serde_json::json!({
116 "Version": "2012-10-17",
117 "Statement": [
118 {
119 "Effect": "Allow",
120 "Principal": {
121 "Service": "lambda.amazonaws.com"
122 },
123 "Action": "sts:AssumeRole"
124 }
125 ]
126 });
127
128 info!("创建IAM角色: {}", role_name);
129
130 let create_role_result = self.iam_client.create_role()
131 .role_name(role_name)
132 .assume_role_policy_document(trust_policy.to_string())
133 .send()
134 .await;
135
136 match create_role_result {
137 Ok(result) => {
138 let role_arn = result.role().unwrap().arn().to_string();
139 info!("IAM角色创建成功: {}", role_arn);
140
141 let managed_policy_arns = vec![
142 "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
143 "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole",
144 ];
145
146 for policy_arn in managed_policy_arns {
147 let _ = self.iam_client.attach_role_policy()
148 .role_name(role_name)
149 .policy_arn(policy_arn)
150 .send()
151 .await;
152 info!("附加策略: {}", policy_arn);
153 }
154
155 let cloudwatch_logs_policy = r#"{
157 "Version": "2012-10-17",
158 "Statement": [
159 {
160 "Effect": "Allow",
161 "Action": [
162 "logs:CreateLogGroup",
163 "logs:CreateLogStream",
164 "logs:PutLogEvents",
165 "logs:DescribeLogGroups",
166 "logs:DescribeLogStreams"
167 ],
168 "Resource": "arn:aws:logs:*:*:*"
169 }
170 ]
171 }"#;
172
173 match self.iam_client.put_role_policy()
174 .role_name(role_name)
175 .policy_name("CloudWatchLogsWriteAccess")
176 .policy_document(cloudwatch_logs_policy.to_string())
177 .send()
178 .await {
179 Ok(_) => info!("CloudWatch Logs写权限策略添加成功"),
180 Err(e) => warn!("CloudWatch Logs权限策略添加失败: {}", e)
181 }
182
183 let credential_management_policy = r#"{
185 "Version": "2012-10-17",
186 "Statement": [
187 {
188 "Effect": "Allow",
189 "Action": [
190 "iam:UpdateAccessKey",
191 "iam:ListAccessKeys",
192 "iam:GetAccessKeyLastUsed",
193 "iam:GetUser"
194 ],
195 "Resource": "*"
196 }
197 ]
198 }"#;
199
200 match self.iam_client.put_role_policy()
201 .role_name(role_name)
202 .policy_name("BedrockCredentialManagement")
203 .policy_document(credential_management_policy.to_string())
204 .send()
205 .await {
206 Ok(_) => info!("🔐 凭据管理权限策略添加成功"),
207 Err(e) => warn!("⚠️ 凭据管理权限策略添加失败: {}", e)
208 }
209
210 let sns_notification_policy = r#"{
212 "Version": "2012-10-17",
213 "Statement": [
214 {
215 "Effect": "Allow",
216 "Action": [
217 "sns:Publish"
218 ],
219 "Resource": "arn:aws:sns:*:*:bedrock-throttling-alerts"
220 }
221 ]
222 }"#;
223
224 match self.iam_client.put_role_policy()
225 .role_name(role_name)
226 .policy_name("BedrockSNSNotifications")
227 .policy_document(sns_notification_policy.to_string())
228 .send()
229 .await {
230 Ok(_) => info!("📧 SNS通知权限策略添加成功"),
231 Err(e) => warn!("⚠️ SNS通知权限策略添加失败: {}", e)
232 }
233
234 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
235 Ok(role_arn)
236 }
237 Err(e) => {
238 error!("IAM角色创建失败: {}", e);
239 Err(anyhow!("创建IAM角色失败: {}", e))
240 }
241 }
242 }
243
244 pub async fn deploy_lambda_function(&self, zip_content: Vec<u8>) -> Result<String> {
245 let function_name = "bedrock-monitor-function";
246
247 info!("开始部署Lambda函数: {}", function_name);
248 info!("ZIP包大小: {} bytes", zip_content.len());
249
250 let function_exists = self.lambda_client.get_function()
252 .function_name(function_name)
253 .send()
254 .await.is_ok();
255
256 let role_arn = if !function_exists {
258 let role_name = "lambda-bedrock-monitor-role";
260 match self.iam_client.get_role().role_name(role_name).send().await {
261 Ok(role_result) => {
262 info!("使用现有IAM角色: {}", role_name);
263 role_result.role().unwrap().arn().to_string()
264 }
265 Err(_) => {
266 info!("创建新IAM角色: {}", role_name);
268 self.create_lambda_execution_role_with_name(role_name).await?
269 }
270 }
271 } else {
272 info!("函数已存在,获取现有角色ARN");
273 let function_config = self.lambda_client.get_function_configuration()
275 .function_name(function_name)
276 .send()
277 .await?;
278 function_config.role().unwrap().to_string()
279 };
280
281 let env_vars = HashMap::from([
282 ("RUST_LOG".to_string(), "info".to_string()),
283 ("BEDROCK_AUTO_CREDENTIAL_DISABLE".to_string(), "true".to_string()),
284 ("BEDROCK_DRY_RUN_MODE".to_string(), "false".to_string()),
285 ]);
287
288 let environment = Environment::builder()
289 .set_variables(Some(env_vars))
290 .build();
291
292 let function_arn = if function_exists {
293 let function_config = self.lambda_client.get_function_configuration()
295 .function_name(function_name)
296 .send()
297 .await?;
298
299 let current_runtime = function_config.runtime().unwrap_or(&Runtime::Providedal2);
300
301 if !matches!(current_runtime, Runtime::Python39 | Runtime::Python38 | Runtime::Python37) {
303 info!("检测到运行时变更,重新创建Lambda函数");
304
305 let _ = self.lambda_client.delete_function()
307 .function_name(function_name)
308 .send()
309 .await;
310
311 info!("创建新的Python Lambda函数");
313 let create_function_result = self.lambda_client.create_function()
314 .function_name(function_name)
315 .runtime(Runtime::Python312)
316 .handler("lambda_function.lambda_handler")
317 .code(FunctionCode::builder().zip_file(Blob::new(zip_content)).build())
318 .role(role_arn)
319 .architectures(Architecture::X8664)
320 .timeout(30)
321 .memory_size(128)
322 .environment(environment)
323 .send()
324 .await;
325
326 match create_function_result {
327 Ok(result) => {
328 result.function_arn().unwrap().to_string()
329 }
330 Err(e) => {
331 error!("Lambda函数创建失败: {}", e);
332 error!("错误类型: {:?}", e);
333 error!("完整错误信息: {:?}", e);
334 return Err(anyhow!("创建Lambda函数失败: {}", e));
335 }
336 }
337 } else {
338 info!("更新现有Lambda函数");
340 let update_result = self.lambda_client.update_function_code()
341 .function_name(function_name)
342 .zip_file(Blob::new(zip_content))
343 .send()
344 .await;
345
346 match update_result {
347 Ok(result) => result.function_arn().unwrap().to_string(),
348 Err(e) => {
349 error!("Lambda函数更新失败: {}", e);
350 return Err(anyhow!("更新Lambda函数失败: {}", e));
351 }
352 }
353 }
354 } else {
355 info!("创建新Lambda函数");
356 let max_retries = 3;
357
358 for attempt in 1..=max_retries {
359 let create_function_result = self.lambda_client.create_function()
360 .function_name(function_name)
361 .runtime(Runtime::Python312)
362 .handler("lambda_function.lambda_handler")
363 .code(FunctionCode::builder().zip_file(Blob::new(zip_content.clone())).build())
364 .role(&role_arn)
365 .architectures(Architecture::X8664)
366 .timeout(30)
367 .memory_size(128)
368 .environment(environment.clone())
369 .send()
370 .await;
371
372 match create_function_result {
373 Ok(result) => {
374 let function_arn = result.function_arn().unwrap().to_string();
375 info!("✅ Lambda函数创建成功: {}", function_arn);
376 return Ok(function_arn);
377 }
378 Err(e) => {
379 let error_str = e.to_string();
380 let raw_error = format!("{:?}", e);
381
382 error!("❌ Lambda函数创建失败 (尝试 {}/{}):", attempt, max_retries);
383 error!("┌─ 错误信息: {}", e);
384 error!("├─ 错误字符串: {}", error_str);
385 error!("└─ 原始错误对象: {}", raw_error);
386 error!("┌─ 当前区域: {}", self.region);
387 error!("├─ 函数名称: {}", function_name);
388 error!("├─ IAM角色ARN: {}", role_arn);
389 error!("└─ 账户ID: {}", self.get_account_id().await.unwrap_or_else(|_| "unknown".to_string()));
390
391 if error_str.contains("ResourceConflictException") ||
393 error_str.contains("already exists") ||
394 error_str.contains("Function already exist") {
395 info!("ℹ️ Lambda函数已存在,获取函数ARN");
396 match self.lambda_client.get_function_configuration()
398 .function_name(function_name)
399 .send()
400 .await {
401 Ok(config) => {
402 let function_arn = config.function_arn().unwrap().to_string();
403 info!("✅ 获取已存在的Lambda函数: {}", function_arn);
404 return Ok(function_arn);
405 }
406 Err(get_e) => {
407 warn!("⚠️ 无法获取已存在的Lambda函数ARN: {}", get_e);
408 let account_id = self.get_account_id().await?;
410 let default_arn = format!("arn:aws:lambda:{}:{}:function:{}",
411 self.region, account_id, function_name);
412 return Ok(default_arn);
413 }
414 }
415 }
416
417 if attempt < max_retries {
418 let wait_time = attempt * 5; info!("等待 {} 秒后重试...", wait_time);
420 tokio::time::sleep(tokio::time::Duration::from_secs(wait_time as u64)).await;
421 } else {
422 error!("🔍 详细错误分析:");
423
424 if error_str.contains("service error") {
425 error!("┌─ 错误类型: AWS服务错误");
426 error!("├─ 可能原因:");
427 error!("│ • AWS服务暂时不可用");
428 error!("│ • 网络连接问题");
429 error!("│ • AWS服务延迟");
430 error!("│ • 区域服务暂时中断");
431 error!("├─ 建议解决方案:");
432 error!("│ • 检查网络连接");
433 error!("│ • 稍后重试");
434 error!("│ • 尝试使用其他AWS区域");
435 error!("│ • 检查AWS服务状态页面");
436 error!("└─ AWS状态页面: https://status.aws.amazon.com/");
437 } else if error_str.contains("AccessDeniedException") {
438 error!("┌─ 错误类型: 权限不足");
439 error!("├─ 可能原因:");
440 error!("│ • IAM角色无权限在{}区域创建Lambda函数", self.region);
441 error!("│ • IAM角色跨区域权限限制");
442 error!("├─ IAM角色: {}", role_arn);
443 error!("└─ 建议解决方案:");
444 error!(" • 检查IAM角色{}的跨区域权限", role_arn);
445 error!(" • 为IAM角色添加lambda:*权限");
446 error!(" • 或者使用IAM:CreateFunction权限");
447 } else if error_str.contains("InvalidParameterValueException") {
448 error!("┌─ 错误类型: 参数无效");
449 error!("├─ 可能原因:");
450 error!("│ • 函数名、角色ARN或配置参数格式错误");
451 error!("│ • 不支持的配置值");
452 error!("└─ 建议解决方案:");
453 error!(" • 检查函数名是否符合命名规范");
454 error!(" • 验证IAM角色ARN格式");
455 error!(" • 检查运行时和配置参数");
456 } else if error_str.contains("LimitExceededException") {
457 error!("┌─ 错误类型: 资源限制");
458 error!("├─ 可能原因:");
459 error!("│ • 账户Lambda函数数量超限");
460 error!("│ • 并发创建限制");
461 error!("└─ 建议解决方案:");
462 error!(" • 删除不需要的Lambda函数");
463 error!(" • 提高AWS服务配额");
464 } else {
465 error!("┌─ 错误类型: 未知错误");
466 error!("└─ 建议联系AWS技术支持");
467 }
468
469 return Err(anyhow!("创建Lambda函数失败: {}", e));
470 }
471 }
472 }
473 }
474
475 return Err(anyhow!("Lambda函数创建失败: 重试{}次后仍然失败", max_retries));
476 };
477
478 info!("Lambda函数部署成功: {}", function_arn);
479 Ok(function_arn)
480 }
481
482 pub async fn create_cloudwatch_alarm(&self, function_name: &str) -> Result<()> {
483 let alarm_name = format!("{}-error-alarm", function_name);
484
485 info!("创建CloudWatch告警: {}", alarm_name);
486
487 let _account_id = self.get_account_id().await?;
488
489 let dimension1 = aws_sdk_cloudwatch::types::Dimension::builder()
490 .name("FunctionName")
491 .value(function_name)
492 .build();
493
494 let _ = self.cloudwatch_client.put_metric_alarm()
495 .alarm_name(&alarm_name)
496 .alarm_description("Lambda函数错误率告警")
497 .namespace("AWS/Lambda")
498 .metric_name("Errors")
499 .dimensions(dimension1)
500 .statistic(aws_sdk_cloudwatch::types::Statistic::Sum)
501 .period(300)
502 .evaluation_periods(1)
503 .threshold(1.0)
504 .comparison_operator(aws_sdk_cloudwatch::types::ComparisonOperator::GreaterThanOrEqualToThreshold)
505 .send()
506 .await;
507
508 info!("CloudWatch告警创建成功");
509 Ok(())
510 }
511
512 pub async fn create_bedrock_eventbridge_rule(&self) -> Result<String> {
513 let rule_name = "bedrock-api-monitor-rule";
514 let max_retries = 3;
515
516 info!("创建EventBridge规则: {}", rule_name);
517
518 match self.eventbridge_client.list_rules()
520 .name_prefix(rule_name)
521 .send()
522 .await {
523 Ok(response) => {
524 let rules = response.rules();
525 for rule in rules {
526 if let Some(name) = rule.name() {
527 if name == rule_name {
528 let rule_arn = rule.arn().unwrap().to_string();
529 info!("✅ EventBridge规则已存在: {}", rule_arn);
530 return Ok(rule_arn);
531 }
532 }
533 }
534 }
535 Err(e) => {
536 warn!("⚠️ 无法检查EventBridge规则是否已存在: {},继续尝试创建", e);
537 }
538 }
539
540 let event_pattern = r#"{
542 "source": ["aws.bedrock"],
543 "detail-type": ["AWS API Call via CloudTrail"],
544 "detail": {
545 "eventSource": ["bedrock.amazonaws.com"],
546 "eventName": ["InvokeModel", "Converse", "InvokeModelWithResponseStream"]
547 }
548 }"#;
549
550 for attempt in 1..=max_retries {
551 let put_rule_result = self.eventbridge_client.put_rule()
552 .name(rule_name)
553 .event_pattern(event_pattern)
554 .state(aws_sdk_eventbridge::types::RuleState::Enabled)
555 .description("专门监控AWS Bedrock invoke-model API调用 - 记录200和429状态")
556 .send()
557 .await;
558
559 match put_rule_result {
560 Ok(result) => {
561 let rule_arn = result.rule_arn().unwrap().to_string();
562 info!("EventBridge规则创建成功: {}", rule_arn);
563 return Ok(rule_arn);
564 }
565 Err(e) => {
566 let error_str = e.to_string();
567 error!("EventBridge规则创建失败 (尝试 {}/{}): {}", attempt, max_retries, e);
568
569 if error_str.contains("ResourceConflictException") ||
571 error_str.contains("already exists") ||
572 error_str.contains("Rule already exists") {
573 info!("ℹ️ EventBridge规则已存在,跳过创建");
574 match self.eventbridge_client.list_rules()
576 .name_prefix(rule_name)
577 .send()
578 .await {
579 Ok(response) => {
580 let rules = response.rules();
581 for rule in rules {
582 if let Some(name) = rule.name() {
583 if name == rule_name {
584 let rule_arn = rule.arn().unwrap().to_string();
585 info!("✅ 获取已存在的EventBridge规则: {}", rule_arn);
586 return Ok(rule_arn);
587 }
588 }
589 }
590 }
591 Err(list_e) => {
592 warn!("⚠️ 无法获取已存在的EventBridge规则ARN: {}", list_e);
593 let account_id = self.get_account_id().await?;
595 let default_arn = format!("arn:aws:events:{}:{}:rule/{}",
596 self.region, account_id, rule_name);
597 return Ok(default_arn);
598 }
599 }
600 }
601
602 if attempt < max_retries {
603 let wait_time = attempt * 3; info!("等待 {} 秒后重试...", wait_time);
605 tokio::time::sleep(tokio::time::Duration::from_secs(wait_time as u64)).await;
606 } else {
607 if error_str.contains("dispatch failure") {
609 error!("❌ EventBridge服务暂时不可用,可能是网络问题或AWS服务延迟");
610 } else if error_str.contains("InternalFailure") {
611 error!("❌ EventBridge内部服务错误");
612 } else if error_str.contains("AccessDeniedException") {
613 error!("❌ 权限不足: 无权限创建EventBridge规则");
614 } else if error_str.contains("LimitExceededException") {
615 error!("❌ EventBridge规则数量超限");
616 } else {
617 error!("❌ 未知EventBridge错误: {}", error_str);
618 }
619
620 return Err(anyhow!("创建EventBridge规则失败: {}", e));
621 }
622 }
623 }
624 }
625
626 Err(anyhow!("EventBridge规则创建失败: 重试{}次后仍然失败", max_retries))
627 }
628
629 pub async fn add_lambda_target_to_eventbridge(&self) -> Result<()> {
630 let rule_name = "bedrock-api-monitor-rule";
631 let function_name = "bedrock-monitor-function";
632
633 info!("为EventBridge规则添加Lambda目标: {} -> {}", rule_name, function_name);
634
635 let function_config = self.lambda_client.get_function_configuration()
637 .function_name(function_name)
638 .send()
639 .await?;
640
641 let function_arn = function_config.function_arn().unwrap();
642
643 let target = aws_sdk_eventbridge::types::Target::builder()
645 .id("1")
646 .arn(function_arn)
647 .build()?;
648
649 let targets_result = self.eventbridge_client.put_targets()
650 .rule(rule_name)
651 .targets(target)
652 .send()
653 .await;
654
655 match targets_result {
656 Ok(_) => {
657 info!("Lambda目标添加成功");
658
659 let eventbridge_permission_exists = match self.lambda_client.get_policy()
661 .function_name(function_name)
662 .send()
663 .await {
664 Ok(policy_result) => {
665 if let Some(policy) = policy_result.policy() {
666 policy.contains("bedrock-eventbridge-invoke") ||
667 policy.contains("EventBridgeInvoke") ||
668 (policy.contains("events.amazonaws.com") &&
669 policy.contains(&format!("arn:aws:events:{}:{}:rule/{}",
670 self.region, self.get_account_id().await?, rule_name)))
671 } else {
672 false
673 }
674 }
675 Err(_) => false
676 };
677
678 let add_permission_result = if eventbridge_permission_exists {
679 None
680 } else {
681 info!("EventBridge权限不存在,开始添加...");
682 Some(self.lambda_client.add_permission()
683 .function_name(function_name)
684 .statement_id("bedrock-eventbridge-invoke")
685 .action("lambda:InvokeFunction")
686 .principal("events.amazonaws.com")
687 .source_arn(format!("arn:aws:events:{}:{}:rule/{}",
688 self.region,
689 self.get_account_id().await?,
690 rule_name
691 ))
692 .send()
693 .await)
694 };
695
696 match add_permission_result {
697 Some(Ok(response)) => {
698 info!("Lambda EventBridge调用权限添加成功");
699 if let Some(statement) = response.statement() {
700 info!("权限声明: {}", statement);
701 }
702 Ok(())
703 }
704 Some(Err(e)) => {
705 let error_str = e.to_string();
706 let raw_error = format!("{:?}", e);
707 error!("Lambda EventBridge调用权限添加失败");
708 error!("错误信息: {}", error_str);
709 error!("原始错误: {}", raw_error);
710
711 if error_str.contains("ResourceConflictException") ||
713 error_str.contains("already exists") ||
714 error_str.contains("The resource-based policy") ||
715 error_str.contains("already has a statement") {
716 info!("Lambda EventBridge调用权限已存在,跳过添加");
717 Ok(())
718 } else if error_str.contains("InvalidParameterValueException") {
719 warn!("⚠️ EventBridge权限参数无效: 检查函数名和规则ARN格式,但继续部署");
720 Ok(()) } else if error_str.contains("AccessDeniedException") {
722 warn!("⚠️ 权限不足: 无权限为Lambda函数添加EventBridge调用权限,但继续部署");
723 Ok(()) } else if error_str.contains("ResourceNotFoundException") {
725 warn!("⚠️ 资源未找到: Lambda函数或EventBridge规则不存在,但继续部署");
726 Ok(()) } else if error_str.contains("service error") {
728 warn!("⚠️ AWS服务错误,但权限可能已成功添加,继续部署");
729 Ok(()) } else {
731 warn!("Lambda EventBridge调用权限添加失败: {},但继续部署", e);
732 Ok(()) }
734 }
735 None => {
736 info!("EventBridge权限已存在,无需添加");
737 Ok(())
738 }
739 }
740 }
741 Err(e) => {
742 error!("Lambda目标添加失败: {}", e);
743 Err(anyhow!("添加Lambda目标失败: {}", e))
744 }
745 }
746 }
747
748 pub async fn test_lambda_function(&self, payload: &str) -> Result<String> {
749 info!("测试Lambda函数调用,payload: {}", payload);
750
751 match self.lambda_client.invoke()
752 .function_name("bedrock-monitor-function")
753 .payload(aws_sdk_lambda::primitives::Blob::new(payload.as_bytes()))
754 .send()
755 .await {
756 Ok(response) => {
757 info!("Lambda调用成功");
758 let mut result = String::new();
759
760 if let Some(payload) = response.payload() {
761 let payload_str = String::from_utf8(payload.as_ref().to_vec())?;
762 info!("响应: {}", payload_str);
763 result.push_str(&format!("Response: {}\n", payload_str));
764 }
765
766 if let Some(log_result) = response.log_result() {
767 info!("日志: {}", log_result);
768 result.push_str(&format!("Logs: {}\n", log_result));
769 }
770
771 if let Some(function_error) = response.function_error() {
772 let error_str = std::str::from_utf8(function_error.as_ref())
773 .unwrap_or("无法解析错误信息");
774 error!("函数错误: {}", error_str);
775 result.push_str(&format!("Error: {}\n", error_str));
776 }
777
778 Ok(result)
779 }
780 Err(e) => {
781 error!("Lambda调用失败: {}", e);
782 Err(anyhow!("Lambda调用失败: {}", e))
783 }
784 }
785 }
786
787 pub async fn create_lambda_function_url(&self) -> Result<String> {
788 let function_name = "bedrock-monitor-function";
789 info!("创建Lambda Function URL for: {}", function_name);
790
791 let _function_config = self.lambda_client.get_function_configuration()
793 .function_name(function_name)
794 .send()
795 .await?;
796
797 let _create_url_config_result = self.lambda_client.create_function_url_config()
799 .function_name(function_name)
800 .auth_type(aws_sdk_lambda::types::FunctionUrlAuthType::None)
801 .invoke_mode(aws_sdk_lambda::types::InvokeMode::Buffered)
802 .send()
803 .await?;
804
805 let account_id = self.get_account_id().await?;
807
808 let _ = self.lambda_client.add_permission()
809 .function_name(function_name)
810 .statement_id("FunctionURLAllowPublicAccess")
811 .action("lambda:InvokeFunctionUrl")
812 .principal("*")
813 .function_url_auth_type(aws_sdk_lambda::types::FunctionUrlAuthType::None)
814 .source_arn(&format!("arn:aws:lambda:{}:{}:function:{}",
815 self.region, account_id, function_name))
816 .send()
817 .await?;
818
819 let function_url = format!(
821 "https://{}.lambda-url.{}.on.aws",
822 function_name, self.region
823 );
824
825 info!("Lambda Function URL创建成功: {}", function_url);
826 Ok(function_url)
827 }
828
829 pub async fn setup_cloudtrail_for_bedrock(&self) -> Result<String> {
830 let trail_name = "bedrock-api-monitoring-trail";
831 let account_id = self.get_account_id().await?;
832 let s3_bucket_name = format!("bedrock-cloudtrail-logs-{}", account_id);
833
834 info!("设置CloudTrail以监控Bedrock API调用: {}", trail_name);
835
836 self.create_s3_bucket_if_not_exists(&s3_bucket_name).await?;
838
839 let create_trail_result = self.cloudtrail_client.create_trail()
841 .name(trail_name)
842 .s3_bucket_name(s3_bucket_name)
843 .include_global_service_events(true)
844 .is_multi_region_trail(true)
845 .enable_log_file_validation(true)
846 .send()
847 .await;
848
849 let trail_arn = match create_trail_result {
850 Ok(result) => {
851 let arn = result.trail_arn().unwrap().to_string();
852 info!("CloudTrail创建成功: {}", arn);
853 arn
854 }
855 Err(e) => {
856 error!("CloudTrail创建失败: {}", e);
857 return Err(anyhow!("创建CloudTrail失败: {}", e));
858 }
859 };
860
861 info!("配置CloudTrail数据事件...");
863 self.configure_cloudtrail_data_events(trail_name).await?;
864
865 let _ = self.cloudtrail_client.start_logging()
867 .name(trail_name)
868 .send()
869 .await?;
870
871 info!("CloudTrail启动成功");
872 Ok(trail_arn)
873 }
874
875 async fn create_s3_bucket_if_not_exists(&self, bucket_name: &str) -> Result<()> {
876 info!("检查S3存储桶: {}", bucket_name);
877
878 match self.s3_client.head_bucket().bucket(bucket_name).send().await {
880 Ok(_) => {
881 info!("S3存储桶已存在: {}", bucket_name);
882 return Ok(());
883 }
884 Err(_) => {
885 info!("创建新的S3存储桶: {}", bucket_name);
886 }
887 }
888
889 let create_bucket_result = self.s3_client.create_bucket()
891 .bucket(bucket_name)
892 .send()
893 .await;
894
895 match create_bucket_result {
896 Ok(_) => {
897 info!("S3存储桶创建成功: {}", bucket_name);
898 }
899 Err(e) => {
900 error!("S3存储桶创建失败: {}", e);
901 return Err(anyhow!("创建S3存储桶失败: {}", e));
902 }
903 }
904
905 Ok(())
906 }
907
908 async fn configure_cloudtrail_data_events(&self, trail_name: &str) -> Result<()> {
909 info!("配置CloudTrail数据事件以监控Bedrock");
910
911 let account_id = self.get_account_id().await?;
913
914 let data_resource = aws_sdk_cloudtrail::types::DataResource::builder()
917 .r#type("AWS::Bedrock::Model")
918 .values(&format!("arn:aws:bedrock:{}:{}:model/*", self.region, account_id))
919 .build();
920
921 let event_selector = aws_sdk_cloudtrail::types::EventSelector::builder()
922 .read_write_type(aws_sdk_cloudtrail::types::ReadWriteType::All)
923 .include_management_events(true)
924 .data_resources(data_resource)
925 .build();
926
927 let _ = self.cloudtrail_client.put_event_selectors()
929 .trail_name(trail_name)
930 .event_selectors(event_selector)
931 .send()
932 .await?;
933
934
935 info!("CloudTrail数据事件配置完成");
936 Ok(())
937 }
938
939 pub async fn add_eventbridge_lambda_permission(&self) -> Result<()> {
940 let function_name = "bedrock-monitor-function";
941 let statement_id = "EventBridgeInvoke";
942 let action = "lambda:InvokeFunction";
943 let principal = "events.amazonaws.com";
944
945 let account_id = self.get_account_id().await?;
946 let source_arn = format!("arn:aws:events:{}:{}:rule/bedrock-api-monitor-rule",
947 self.region, account_id);
948
949 info!("添加EventBridge调用Lambda权限: {} -> {}", source_arn, function_name);
950
951 let result = self.lambda_client.add_permission()
952 .function_name(function_name)
953 .statement_id(statement_id)
954 .action(action)
955 .principal(principal)
956 .source_arn(&source_arn)
957 .send()
958 .await;
959
960 match result {
961 Ok(_) => {
962 info!("EventBridge Lambda权限添加成功");
963 Ok(())
964 }
965 Err(e) => {
966 error!("EventBridge Lambda权限添加失败: {}", e);
967 Err(anyhow!("添加Lambda EventBridge调用权限失败: {}", e))
968 }
969 }
970 }
971
972 async fn get_account_id(&self) -> Result<String> {
973 let sts_client = aws_sdk_sts::Client::new(&self.sdk_config);
974 let identity = sts_client.get_caller_identity().send().await?;
975 Ok(identity.account().unwrap().to_string())
976 }
977
978 pub async fn create_sns_topic(&self) -> Result<String> {
980 let topic_name = "bedrock-throttling-alerts";
981
982 info!("创建SNS主题: {}", topic_name);
983
984 let result = self.sns_client.create_topic()
985 .name(topic_name)
986 .send()
987 .await;
988
989 match result {
990 Ok(response) => {
991 let topic_arn = response.topic_arn().unwrap().to_string();
992 info!("SNS主题创建成功: {}", topic_arn);
993 Ok(topic_arn)
994 }
995 Err(e) => {
996 error!("SNS主题创建失败: {}", e);
997 Err(anyhow!("创建SNS主题失败: {}", e))
998 }
999 }
1000 }
1001
1002 pub async fn create_bedrock_cloudwatch_alarms(&self, sns_topic_arn: &str) -> Result<()> {
1004 info!("开始创建Bedrock监控CloudWatch告警...");
1005
1006 self.create_single_alarm(
1010 "bedrock-InvocationThrottles-Immediate",
1011 "立即响应告警 - 检测到1次429时就触发AK/SK关闭",
1012 "AWS/Bedrock",
1013 "InvocationThrottles",
1014 10, 1, 1.0, Some(sns_topic_arn),
1018 None,
1019 ).await?;
1020
1021 self.create_single_alarm(
1023 "bedrock-InvocationThrottles-HighFrequency",
1024 "高频Bedrock API限流告警 - 当Bedrock API限流频率异常时触发",
1025 "AWS/Bedrock",
1026 "InvocationThrottles",
1027 60, 2, 10.0, Some(sns_topic_arn),
1031 None,
1032 ).await?;
1033
1034 let general_dimension = aws_sdk_cloudwatch::types::Dimension::builder()
1036 .name("ModelId")
1037 .value("*")
1038 .build();
1039
1040 self.create_single_alarm(
1041 "bedrock-InvocationThrottles-General",
1042 "通用Bedrock API限流告警 - 对任何模型限流超过3次时告警",
1043 "AWS/Bedrock",
1044 "InvocationThrottles",
1045 60, 3, 5.0, Some(sns_topic_arn),
1049 Some(vec![general_dimension]),
1050 ).await?;
1051
1052 self.create_model_specific_alarm(
1054 "anthropic.claude-3-5-sonnet-20240620-v1:0",
1055 sns_topic_arn,
1056 ).await?;
1057
1058 self.create_model_specific_alarm(
1060 "anthropic.claude-3-sonnet-20240229-v1:0",
1061 sns_topic_arn,
1062 ).await?;
1063
1064 self.create_model_specific_alarm(
1066 "anthropic.claude-3-5-haiku-20241022-v1:0",
1067 sns_topic_arn,
1068 ).await?;
1069
1070 self.create_single_alarm(
1072 "bedrock-InvocationClientErrors",
1073 "Bedrock客户端错误告警 - 当客户端错误超过5次时告警",
1074 "AWS/Bedrock",
1075 "InvocationClientErrors",
1076 60, 2, 5.0, Some(sns_topic_arn),
1080 None,
1081 ).await?;
1082
1083 self.create_single_alarm(
1085 "bedrock-InvocationServerErrors",
1086 "Bedrock服务器错误告警 - 当服务器错误超过3次时告警",
1087 "AWS/Bedrock",
1088 "InvocationServerErrors",
1089 60, 1, 3.0, Some(sns_topic_arn),
1093 None,
1094 ).await?;
1095
1096 info!("✅ 所有Bedrock监控CloudWatch告警创建完成");
1097 Ok(())
1098 }
1099
1100 async fn create_single_alarm(
1102 &self,
1103 alarm_name: &str,
1104 description: &str,
1105 namespace: &str,
1106 metric_name: &str,
1107 period: i64,
1108 evaluation_periods: i32,
1109 threshold: f64,
1110 sns_topic_arn: Option<&str>,
1111 dimensions: Option<Vec<aws_sdk_cloudwatch::types::Dimension>>,
1112 ) -> Result<()> {
1113 info!("创建CloudWatch告警: {}", alarm_name);
1114
1115 let mut alarm_request = self.cloudwatch_client.put_metric_alarm()
1116 .alarm_name(alarm_name)
1117 .alarm_description(description)
1118 .namespace(namespace)
1119 .metric_name(metric_name)
1120 .statistic(aws_sdk_cloudwatch::types::Statistic::Sum)
1121 .period(period.try_into()?)
1122 .evaluation_periods(evaluation_periods)
1123 .threshold(threshold)
1124 .comparison_operator(aws_sdk_cloudwatch::types::ComparisonOperator::GreaterThanThreshold);
1125
1126 if let Some(topic_arn) = sns_topic_arn {
1128 alarm_request = alarm_request.alarm_actions(topic_arn);
1129 }
1130
1131 if let Some(dims) = dimensions {
1133 for dim in dims {
1134 alarm_request = alarm_request.dimensions(dim);
1135 }
1136 }
1137
1138 match alarm_request.send().await {
1139 Ok(_) => {
1140 info!("CloudWatch告警创建成功: {}", alarm_name);
1141 Ok(())
1142 }
1143 Err(e) => {
1144 error!("CloudWatch告警创建失败 {}: {}", alarm_name, e);
1145 Err(anyhow!("创建CloudWatch告警失败: {}", e))
1146 }
1147 }
1148 }
1149
1150 async fn create_model_specific_alarm(&self, model_id: &str, sns_topic_arn: &str) -> Result<()> {
1152 let alarm_name = format!("bedrock-InvocationThrottles-{}", model_id.replace(':', "-"));
1153 let description = format!("Claude模型 {} 限流告警 - 当模型1分钟内限流超过3次时告警", model_id);
1154
1155 let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
1156 .name("ModelId")
1157 .value(model_id)
1158 .build();
1159
1160 self.create_single_alarm(
1161 &alarm_name,
1162 &description,
1163 "AWS/Bedrock",
1164 "InvocationThrottles",
1165 60, 1, 3.0, Some(sns_topic_arn),
1169 Some(vec![dimension]),
1170 ).await
1171 }
1172
1173 pub async fn configure_sns_lambda_trigger(&self, sns_topic_arn: &str) -> Result<()> {
1175 let function_name = "bedrock-monitor-function";
1176
1177 info!("配置SNS到Lambda的触发器: {} -> {}", sns_topic_arn, function_name);
1178
1179 let function_config = self.lambda_client.get_function_configuration()
1181 .function_name(function_name)
1182 .send()
1183 .await?;
1184
1185 let function_arn = function_config.function_arn().unwrap();
1186
1187 let result = self.sns_client.subscribe()
1189 .topic_arn(sns_topic_arn)
1190 .protocol("lambda")
1191 .endpoint(function_arn)
1192 .return_subscription_arn(true)
1193 .send()
1194 .await;
1195
1196 match result {
1197 Ok(response) => {
1198 let subscription_arn = response.subscription_arn().unwrap().to_string();
1199 info!("SNS订阅创建成功: {}", subscription_arn);
1200
1201 match self.add_sns_lambda_permission(function_name, sns_topic_arn).await {
1203 Ok(_) => info!("SNS Lambda权限添加成功"),
1204 Err(e) => {
1205 warn!("SNS Lambda权限配置失败,但继续部署: {}", e);
1208 }
1209 }
1210
1211 Ok(())
1212 }
1213 Err(e) => {
1214 let error_str = e.to_string();
1215 error!("SNS订阅创建失败: {}", e);
1216
1217 if error_str.contains("already exists") ||
1219 error_str.contains("ResourceConflictException") ||
1220 error_str.contains("Duplicate subscription") ||
1221 error_str.contains("already a subscription") {
1222 info!("SNS订阅已存在,跳过创建");
1223 match self.add_sns_lambda_permission(function_name, sns_topic_arn).await {
1225 Ok(_) => info!("SNS Lambda权限添加成功"),
1226 Err(e) => {
1227 warn!("SNS Lambda权限配置失败,但继续部署: {}", e);
1228 }
1229 }
1230 Ok(())
1231 } else if error_str.contains("dispatch failure") {
1232 warn!("⚠️ SNS订阅可能已存在或网络问题,但继续部署");
1233 match self.add_sns_lambda_permission(function_name, sns_topic_arn).await {
1235 Ok(_) => info!("SNS Lambda权限添加成功"),
1236 Err(e) => {
1237 warn!("SNS Lambda权限配置失败,但继续部署: {}", e);
1238 }
1239 }
1240 Ok(())
1241 } else if error_str.contains("InvalidParameterException") {
1242 error!("❌ SNS订阅参数无效: 检查主题ARN或Lambda函数ARN格式");
1243 Err(anyhow!("SNS订阅参数无效: {}", error_str))
1244 } else if error_str.contains("AccessDeniedException") {
1245 error!("❌ 权限不足: 无权限创建SNS订阅");
1246 Err(anyhow!("权限不足: {}", error_str))
1247 } else if error_str.contains("NotFoundException") {
1248 error!("❌ 资源未找到: SNS主题或Lambda函数不存在");
1249 Err(anyhow!("资源未找到: {}", error_str))
1250 } else {
1251 error!("❌ 未知SNS订阅错误: {}", error_str);
1252 Err(anyhow!("创建SNS订阅失败: {}", e))
1253 }
1254 }
1255 }
1256 }
1257
1258 async fn add_sns_lambda_permission(&self, function_name: &str, sns_topic_arn: &str) -> Result<()> {
1260 let statement_id = "sns-invoke-permission";
1261
1262 info!("添加SNS调用Lambda权限: {} -> {}", sns_topic_arn, function_name);
1263
1264 let sns_permission_exists = match self.lambda_client.get_policy()
1266 .function_name(function_name)
1267 .send()
1268 .await {
1269 Ok(policy_result) => {
1270 if let Some(policy) = policy_result.policy() {
1271 policy.contains("sns-invoke-permission") ||
1272 (policy.contains("sns.amazonaws.com") && policy.contains(sns_topic_arn))
1273 } else {
1274 false
1275 }
1276 }
1277 Err(_) => false
1278 };
1279
1280 if sns_permission_exists {
1281 info!("✅ SNS权限已存在,跳过添加");
1282 return Ok(());
1283 }
1284
1285 let result = self.lambda_client.add_permission()
1286 .function_name(function_name)
1287 .statement_id(statement_id)
1288 .action("lambda:InvokeFunction")
1289 .principal("sns.amazonaws.com")
1290 .source_arn(sns_topic_arn)
1291 .send()
1292 .await;
1293
1294 match result {
1295 Ok(response) => {
1296 info!("SNS Lambda权限添加成功");
1297 if let Some(statement) = response.statement() {
1298 info!("权限声明: {}", statement);
1299 }
1300 Ok(())
1301 }
1302 Err(e) => {
1303 let error_str = e.to_string();
1304 let raw_error = format!("{:?}", e);
1305 error!("SNS Lambda权限添加失败");
1306 error!("错误信息: {}", error_str);
1307 error!("原始错误: {}", raw_error);
1308
1309 match self.lambda_client.get_policy()
1311 .function_name(function_name)
1312 .send()
1313 .await {
1314 Ok(policy_result) => {
1315 if let Some(policy) = policy_result.policy() {
1316 info!("当前Lambda函数策略已存在");
1317 if policy.contains("sns-invoke-permission") {
1318 info!("✅ SNS权限已存在于策略中,无需重复添加");
1319 return Ok(());
1320 }
1321 }
1322 }
1323 Err(_) => {
1324 warn!("无法检查Lambda函数策略状态");
1325 }
1326 }
1327
1328 if error_str.contains("ResourceConflictException") ||
1330 error_str.contains("already exists") ||
1331 error_str.contains("The resource-based policy") ||
1332 error_str.contains("already has a statement") {
1333 info!("SNS Lambda权限已存在,跳过添加");
1334 Ok(())
1335 } else if error_str.contains("InvalidParameterValueException") {
1336 error!("❌ SNS权限参数无效: 检查函数名和SNS主题ARN格式");
1337 Err(anyhow!("SNS权限参数无效: {}", error_str))
1338 } else if error_str.contains("AccessDeniedException") {
1339 error!("❌ 权限不足: 无权限为Lambda函数添加SNS调用权限");
1340 Err(anyhow!("权限不足: {}", error_str))
1341 } else if error_str.contains("ResourceNotFoundException") {
1342 error!("❌ 资源未找到: Lambda函数或SNS主题不存在");
1343 Err(anyhow!("资源未找到: {}", error_str))
1344 } else if error_str.contains("service error") {
1345 warn!("⚠️ AWS服务错误,但权限可能已成功添加,继续部署");
1346 Ok(())
1347 } else {
1348 error!("❌ 未知SNS权限错误: {}", error_str);
1349 Err(anyhow!("添加SNS Lambda权限失败: {}", e))
1350 }
1351 }
1352 }
1353 }
1354
1355 pub async fn deploy_complete_monitoring_stack(&self, zip_content: Vec<u8>) -> Result<()> {
1357 info!("🚀 开始部署完整的Bedrock监控基础设施...");
1358
1359 let function_arn = self.deploy_lambda_function(zip_content).await?;
1361 info!("✅ Lambda函数部署成功: {}", function_arn);
1362
1363 let max_retries = 3;
1365
1366 for attempt in 1..=max_retries {
1367 match self.update_lambda_environment().await {
1368 Ok(_) => {
1369 info!("✅ Lambda环境变量更新成功 (尝试 {})", attempt);
1370 break;
1371 }
1372 Err(e) => {
1373 warn!("⚠️ Lambda环境变量更新失败 (尝试 {}/{}): {}", attempt, max_retries, e);
1374 if attempt < max_retries {
1375 info!("等待 {} 秒后重试...", attempt * 5);
1376 tokio::time::sleep(tokio::time::Duration::from_secs(attempt as u64 * 5)).await;
1377 } else {
1378 error!("❌ Lambda环境变量更新最终失败,但继续部署流程");
1379 }
1380 }
1381 }
1382 }
1383
1384 let sns_topic_arn = self.create_sns_topic().await?;
1386 info!("✅ SNS主题创建成功: {}", sns_topic_arn);
1387
1388 self.create_bedrock_cloudwatch_alarms(&sns_topic_arn).await?;
1390 info!("✅ CloudWatch告警创建成功");
1391
1392 let rule_arn = self.create_bedrock_eventbridge_rule().await?;
1394 info!("✅ EventBridge规则创建成功: {}", rule_arn);
1395
1396 self.add_lambda_target_to_eventbridge().await?;
1398 info!("✅ EventBridge到Lambda触发器配置成功");
1399
1400 self.configure_sns_lambda_trigger(&sns_topic_arn).await?;
1402 info!("✅ SNS到Lambda触发器配置成功");
1403
1404 self.create_cloudwatch_alarm("bedrock-monitor-function").await?;
1406 info!("✅ Lambda错误告警创建成功");
1407
1408 info!("🎉 完整的Bedrock监控基础设施部署成功!");
1409 info!("📊 监控功能包括:");
1410 info!(" - 429错误检测和AK/SK自动关闭");
1411 info!(" - 8个CloudWatch告警");
1412 info!(" - EventBridge API调用监控");
1413 info!(" - SNS告警通知");
1414 info!(" - 实时CloudWatch日志记录");
1415
1416 Ok(())
1417 }
1418
1419 pub async fn update_lambda_environment(&self) -> Result<()> {
1421 let function_name = "bedrock-monitor-function";
1422
1423 info!("更新Lambda函数环境变量: {}", function_name);
1424
1425 match self.lambda_client.get_function_configuration()
1427 .function_name(function_name)
1428 .send()
1429 .await {
1430 Ok(function_config) => {
1431 info!("✅ Lambda函数存在,状态: {:?}", function_config.state());
1432
1433 match function_config.state() {
1435 Some(aws_sdk_lambda::types::State::Pending) => {
1436 warn!("⚠️ Lambda函数正在更新中,等待完成...");
1437 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
1439 }
1440 Some(aws_sdk_lambda::types::State::Active) => {
1441 info!("✅ Lambda函数状态正常");
1442 }
1443 Some(aws_sdk_lambda::types::State::Failed) | Some(aws_sdk_lambda::types::State::Inactive) => {
1444 return Err(anyhow!("❌ Lambda函数状态异常: {:?}", function_config.state()));
1445 }
1446 None => {
1447 warn!("⚠️ 无法获取Lambda函数状态,继续尝试更新");
1448 }
1449 _ => {
1450 warn!("⚠️ Lambda函数处于未知状态: {:?}, 继续尝试更新", function_config.state());
1451 }
1452 }
1453 }
1454 Err(e) => {
1455 error!("❌ 无法获取Lambda函数配置: {}", e);
1456 return Err(anyhow!("Lambda函数不存在或无权限访问: {}", e));
1457 }
1458 }
1459
1460 let env_vars = HashMap::from([
1461 ("RUST_LOG".to_string(), "info".to_string()),
1462 ("BEDROCK_AUTO_CREDENTIAL_DISABLE".to_string(), "true".to_string()),
1463 ("BEDROCK_DRY_RUN_MODE".to_string(), "false".to_string()), ]);
1466
1467 let total_size: usize = env_vars.iter()
1469 .map(|(k, v)| k.len() + v.len() + 2) .sum();
1471
1472 info!("环境变量总大小: {} bytes (限制: 4096 bytes)", total_size);
1473 if total_size > 4096 {
1474 return Err(anyhow!("环境变量总大小超过AWS Lambda限制(4KB): {} bytes", total_size));
1475 }
1476
1477 let environment = Environment::builder()
1478 .set_variables(Some(env_vars))
1479 .build();
1480
1481 info!("正在发送更新请求...");
1482 match self.lambda_client.update_function_configuration()
1483 .function_name(function_name)
1484 .environment(environment)
1485 .send()
1486 .await {
1487 Ok(response) => {
1488 info!("✅ Lambda环境变量更新请求已发送");
1489 info!("函数状态: {:?}", response.state());
1490 info!("最后更新时间: {:?}", response.last_modified());
1491
1492 info!("等待配置更新生效...");
1494 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
1495
1496 match self.lambda_client.get_function_configuration()
1498 .function_name(function_name)
1499 .send()
1500 .await {
1501 Ok(updated_config) => {
1502 if let Some(env) = updated_config.environment() {
1503 if let Some(vars) = env.variables() {
1504 info!("✅ 环境变量验证成功:");
1505 for (key, value) in vars {
1506 info!(" {} = {}", key, value);
1507 }
1508 }
1509 }
1510 info!("✅ Lambda环境变量更新完成");
1511 }
1512 Err(e) => {
1513 warn!("⚠️ 无法验证环境变量更新: {}", e);
1514 }
1515 }
1516
1517 Ok(())
1518 }
1519 Err(e) => {
1520 let error_str = e.to_string();
1521 let raw_error = format!("{:?}", e);
1522
1523 match self.lambda_client.get_function_configuration()
1525 .function_name(function_name)
1526 .send()
1527 .await {
1528 Ok(config) => {
1529 if let Some(env) = config.environment() {
1530 if let Some(vars) = env.variables() {
1531 let has_correct_vars = vars.get("BEDROCK_DRY_RUN_MODE") == Some(&"false".to_string()) &&
1532 vars.get("BEDROCK_AUTO_CREDENTIAL_DISABLE") == Some(&"true".to_string()) &&
1533 vars.get("RUST_LOG") == Some(&"info".to_string());
1534
1535 if has_correct_vars {
1536 info!("✅ Lambda环境变量已正确设置,无需重复更新");
1537 if error_str.contains("ResourceConflictException") {
1538 info!("ℹ️ 检测到Lambda函数正在更新,但环境变量已正确配置");
1539 } else if error_str.contains("service error") {
1540 info!("ℹ️ 检测到AWS服务错误,但环境变量已正确配置");
1541 }
1542 return Ok(());
1543 }
1544 }
1545 }
1546 }
1547 Err(_) => {
1548 warn!("无法验证当前环境变量状态");
1550 }
1551 }
1552
1553 error!("❌ Lambda环境变量更新失败");
1555 error!("错误信息: {}", error_str);
1556 error!("原始错误: {}", raw_error);
1557
1558 if error_str.contains("ResourceConflictException") {
1560 error!("❌ 资源冲突: Lambda函数正在被其他操作更新");
1561 } else if error_str.contains("InvalidParameterValueException") {
1562 error!("❌ 参数无效: 环境变量格式或内容有问题");
1563 } else if error_str.contains("TooManyRequestsException") {
1564 error!("❌ 请求过多: AWS API调用频率超限");
1565 } else if error_str.contains("AccessDeniedException") {
1566 error!("❌ 权限不足: 无权限更新Lambda函数配置");
1567 } else if error_str.contains("ResourceNotFoundException") {
1568 error!("❌ 资源未找到: Lambda函数不存在");
1569 } else if error_str.contains("service error") {
1570 warn!("⚠️ AWS服务错误,可能环境变量已更新,将进行验证");
1571 return Err(anyhow!("更新Lambda环境变量失败: {}", e));
1574 } else {
1575 error!("❌ 未知错误类型: {}", error_str);
1576 }
1577
1578 Err(anyhow!("更新Lambda环境变量失败: {}", e))
1579 }
1580 }
1581 }
1582
1583 pub async fn update_iam_credentials_policy(&self) -> Result<()> {
1585 let role_name = "lambda-bedrock-monitor-role";
1586
1587 info!("更新IAM角色AK/SK管理权限: {}", role_name);
1588
1589 let credential_management_policy = r#"{
1591 "Version": "2012-10-17",
1592 "Statement": [
1593 {
1594 "Effect": "Allow",
1595 "Action": [
1596 "iam:UpdateAccessKey",
1597 "iam:ListAccessKeys",
1598 "iam:GetAccessKeyLastUsed",
1599 "iam:GetUser"
1600 ],
1601 "Resource": "*"
1602 }
1603 ]
1604 }"#;
1605
1606 match self.iam_client.put_role_policy()
1607 .role_name(role_name)
1608 .policy_name("BedrockCredentialManagement")
1609 .policy_document(credential_management_policy.to_string())
1610 .send()
1611 .await {
1612 Ok(_) => {
1613 info!("✅ IAM角色AK/SK管理权限更新成功");
1614 Ok(())
1615 }
1616 Err(e) => {
1617 error!("❌ IAM角色AK/SK管理权限更新失败: {}", e);
1618 Err(anyhow!("更新IAM角色权限失败: {}", e))
1619 }
1620 }
1621 }
1622
1623 pub async fn fix_ak_sk_auto_disable(&self) -> Result<()> {
1625 info!("🔧 开始修复AK/SK自动关闭功能...");
1626
1627 self.update_iam_credentials_policy().await?;
1629 info!("✅ IAM权限已更新支持多区域");
1630
1631 self.update_lambda_environment().await?;
1633 info!("✅ Lambda环境变量已更新(关闭试运行模式)");
1634
1635 info!("🎉 AK/SK自动关闭功能修复完成!");
1636 info!("📊 修复内容:");
1637 info!(" - IAM权限现在支持所有区域");
1638 info!(" - BEDROCK_DRY_RUN_MODE=false(正式运行模式)");
1639 info!(" - BEDROCK_AUTO_CREDENTIAL_DISABLE=true(启用自动禁用)");
1640
1641 Ok(())
1642 }
1643
1644 pub async fn delete_lambda_function(&self) -> Result<()> {
1646 let function_name = "bedrock-monitor-function";
1647 info!("删除Lambda函数: {}", function_name);
1648
1649 match self.lambda_client.delete_function().function_name(function_name).send().await {
1650 Ok(_) => info!("✅ Lambda函数删除成功"),
1651 Err(e) => {
1652 let error_str = e.to_string();
1653 if error_str.contains("ResourceNotFoundException") {
1654 info!("ℹ️ Lambda函数不存在,无需删除");
1655 } else {
1656 warn!("❌ Lambda函数删除失败: {}", e);
1657 return Err(anyhow!("删除Lambda函数失败: {}", e));
1658 }
1659 }
1660 }
1661 Ok(())
1662 }
1663
1664 pub async fn uninstall_complete_monitoring_stack(&self) -> Result<()> {
1666 info!("🗑️ 开始卸载完整的Bedrock监控基础设施...");
1667
1668 self.delete_all_bedrock_alarms().await?;
1670
1671 self.delete_eventbridge_rule().await?;
1673
1674 self.delete_sns_topic().await?;
1676
1677 self.delete_lambda_function().await?;
1679
1680 info!("ℹ️ IAM角色将被保留,以供将来重新部署使用");
1683
1684 info!("✅ 监控基础设施卸载完成");
1685 Ok(())
1686 }
1687
1688 pub async fn delete_all_bedrock_alarms(&self) -> Result<()> {
1689 info!("正在查找并删除以 'bedrock-' 开头的CloudWatch告警...");
1690 let mut next_token = None;
1691 let mut alarm_names = Vec::new();
1692
1693 loop {
1694 let output = self.cloudwatch_client.describe_alarms()
1695 .alarm_name_prefix("bedrock-")
1696 .set_next_token(next_token.clone())
1697 .send()
1698 .await?;
1699
1700 for alarm in output.metric_alarms() {
1701 if let Some(name) = alarm.alarm_name() {
1702 alarm_names.push(name.to_string());
1703 }
1704 }
1705
1706 next_token = output.next_token().map(|s| s.to_string());
1707 if next_token.is_none() {
1708 break;
1709 }
1710 }
1711
1712 if alarm_names.is_empty() {
1713 info!("ℹ️ 未找到相关告警");
1714 return Ok(());
1715 }
1716
1717 for chunk in alarm_names.chunks(100) {
1719 match self.cloudwatch_client.delete_alarms()
1720 .set_alarm_names(Some(chunk.to_vec()))
1721 .send()
1722 .await {
1723 Ok(_) => info!("✅ 已删除 {} 个告警", chunk.len()),
1724 Err(e) => warn!("⚠️ 删除告警失败: {}", e),
1725 }
1726 }
1727
1728 Ok(())
1729 }
1730
1731 pub async fn delete_eventbridge_rule(&self) -> Result<()> {
1732 let rule_name = "bedrock-api-monitor-rule";
1733 info!("删除EventBridge规则: {}", rule_name);
1734
1735 match self.eventbridge_client.list_targets_by_rule().rule(rule_name).send().await {
1737 Ok(output) => {
1738 let ids: Vec<String> = output.targets().iter().map(|t| t.id().to_string()).collect();
1739 if !ids.is_empty() {
1740 match self.eventbridge_client.remove_targets()
1741 .rule(rule_name)
1742 .set_ids(Some(ids))
1743 .send()
1744 .await {
1745 Ok(_) => info!("✅ 已移除规则目标"),
1746 Err(e) => warn!("⚠️ 移除规则目标失败: {}", e),
1747 }
1748 }
1749 }
1750 Err(e) => {
1751 let err_str = e.to_string();
1752 if err_str.contains("ResourceNotFoundException") {
1753 info!("ℹ️ EventBridge规则不存在,跳过目标移除");
1754 return Ok(());
1755 }
1756 warn!("⚠️ 无法获取规则目标: {}", e);
1757 }
1758 }
1759
1760 match self.eventbridge_client.delete_rule().name(rule_name).send().await {
1762 Ok(_) => info!("✅ EventBridge规则删除成功"),
1763 Err(e) => {
1764 let err_str = e.to_string();
1765 if err_str.contains("ResourceNotFoundException") {
1766 info!("ℹ️ EventBridge规则不存在");
1767 } else {
1768 warn!("⚠️ EventBridge规则删除失败: {}", e);
1769 }
1770 }
1771 }
1772 Ok(())
1773 }
1774
1775 pub async fn delete_sns_topic(&self) -> Result<()> {
1776 let topic_name = "bedrock-throttling-alerts";
1777 info!("删除SNS主题: {}", topic_name);
1778
1779 let account_id = match self.get_account_id().await {
1781 Ok(id) => id,
1782 Err(e) => {
1783 warn!("⚠️ 无法获取Account ID,跳过SNS删除: {}", e);
1784 return Ok(());
1785 }
1786 };
1787 let topic_arn = format!("arn:aws:sns:{}:{}:{}", self.region, account_id, topic_name);
1788
1789 match self.sns_client.delete_topic().topic_arn(&topic_arn).send().await {
1790 Ok(_) => info!("✅ SNS主题删除请求已发送"),
1791 Err(e) => {
1792 let err_str = e.to_string();
1793 if err_str.contains("NotFound") {
1794 info!("ℹ️ SNS主题不存在");
1795 } else {
1796 warn!("⚠️ SNS主题删除失败: {}", e);
1797 }
1798 }
1799 }
1800 Ok(())
1801 }
1802
1803 pub async fn delete_iam_role(&self) -> Result<()> {
1804 let role_name = "lambda-bedrock-monitor-role";
1805 info!("删除IAM角色: {}", role_name);
1806
1807 let policies = vec![
1809 "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
1810 "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole",
1811 ];
1812
1813 for policy_arn in policies {
1814 let _ = self.iam_client.detach_role_policy()
1815 .role_name(role_name)
1816 .policy_arn(policy_arn)
1817 .send()
1818 .await;
1819 }
1820
1821 let inline_policies = vec![
1823 "CloudWatchLogsWriteAccess",
1824 "BedrockCredentialManagement",
1825 "BedrockSNSNotifications"
1826 ];
1827
1828 for policy_name in inline_policies {
1829 let _ = self.iam_client.delete_role_policy()
1830 .role_name(role_name)
1831 .policy_name(policy_name)
1832 .send()
1833 .await;
1834 }
1835
1836 match self.iam_client.delete_role().role_name(role_name).send().await {
1837 Ok(_) => info!("✅ IAM角色删除成功"),
1838 Err(e) => {
1839 let err_str = e.to_string();
1840 if err_str.contains("NoSuchEntity") {
1841 info!("ℹ️ IAM角色不存在");
1842 } else {
1843 warn!("⚠️ IAM角色删除失败 (可能需要先手动清理其他策略): {}", e);
1844 }
1845 }
1846 }
1847
1848 Ok(())
1849 }
1850}
1851
1852pub struct MultiRegionDeployer {
1854 regions: Vec<String>,
1855}
1856
1857impl MultiRegionDeployer {
1858 pub fn from_env() -> Result<Self> {
1860 dotenv::dotenv().ok();
1861
1862 let regions_str = std::env::var("AWS_REGION")
1863 .map_err(|_| anyhow!("❌ 错误: AWS_REGION 必须在.env文件中设置"))?;
1864
1865 let regions: Vec<String> = regions_str
1866 .split(',')
1867 .map(|s| s.trim().to_string())
1868 .filter(|s| !s.is_empty())
1869 .collect();
1870
1871 if regions.is_empty() {
1872 return Err(anyhow!("❌ 错误: AWS_REGION 不能为空"));
1873 }
1874
1875 info!("📍 配置的AWS区域: {}", regions.join(", "));
1876 info!("🌍 总计 {} 个区域", regions.len());
1877
1878 Ok(MultiRegionDeployer { regions })
1879 }
1880
1881 pub async fn deploy_to_all_regions(&self) -> Result<Vec<RegionDeployResult>> {
1883 info!("🚀 开始在所有 {} 个区域部署Bedrock监控系统", self.regions.len());
1884
1885 let zip_content = create_lambda_zip()?;
1886 info!("✅ Lambda函数包创建完成,大小: {} bytes", zip_content.len());
1887
1888 let mut results = Vec::new();
1889 let mut success_count = 0;
1890 let mut failed_regions = Vec::new();
1891
1892 for (index, region) in self.regions.iter().enumerate() {
1893 info!("\n📍 [{}/{}] 部署到区域: {}", index + 1, self.regions.len(), region);
1894 info!("{}", "=".repeat(60));
1895
1896 let start_time = std::time::Instant::now();
1897 let result = self.deploy_to_single_region(region, &zip_content).await;
1898 let duration = start_time.elapsed();
1899
1900 match result {
1901 Ok(deploy_result) => {
1902 success_count += 1;
1903 info!("✅ 区域 {} 部署成功 (耗时: {:?})", region, duration);
1904 results.push(RegionDeployResult {
1905 region: region.clone(),
1906 status: DeployStatus::Success,
1907 duration,
1908 details: deploy_result,
1909 });
1910 }
1911 Err(e) => {
1912 failed_regions.push((region.clone(), anyhow::anyhow!("{}", e)));
1913 error!("❌ 区域 {} 部署失败 (耗时: {:?}): {}", region, duration, e);
1914 results.push(RegionDeployResult {
1915 region: region.clone(),
1916 status: DeployStatus::Failed(e),
1917 duration,
1918 details: String::new(),
1919 });
1920 }
1921 }
1922
1923 if index < self.regions.len() - 1 {
1925 info!("⏳ 等待 3 秒后部署下一个区域...");
1926 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1927 }
1928 }
1929
1930 self.print_deployment_summary(&results, success_count, &failed_regions).await;
1932
1933 if success_count == 0 {
1934 return Err(anyhow!("❌ 所有区域部署失败"));
1935 } else if failed_regions.len() > 0 {
1936 warn!("⚠️ {} 个区域部署失败,请检查错误信息", failed_regions.len());
1937 }
1938
1939 Ok(results)
1940 }
1941
1942 pub async fn uninstall_from_all_regions(&self) -> Result<Vec<RegionDeployResult>> {
1944 info!("🗑️ 开始在所有 {} 个区域彻底卸载Bedrock监控系统", self.regions.len());
1945
1946 let mut results = Vec::new();
1947 let mut success_count = 0;
1948
1949 for (index, region) in self.regions.iter().enumerate() {
1950 let start_time = std::time::Instant::now();
1951 info!("\n📍 [{}/{}] 正在清理区域: {}", index + 1, self.regions.len(), region);
1952
1953 match AwsManager::new(region).await {
1954 Ok(manager) => {
1955 match manager.uninstall_complete_monitoring_stack().await {
1956 Ok(_) => {
1957 success_count += 1;
1958 info!("✅ 区域 {} 清理成功", region);
1959 results.push(RegionDeployResult {
1960 region: region.clone(),
1961 status: DeployStatus::Success,
1962 duration: start_time.elapsed(),
1963 details: "Full stack uninstalled".to_string(),
1964 });
1965 },
1966 Err(e) => {
1967 error!("❌ 区域 {} 清理失败: {}", region, e);
1968 results.push(RegionDeployResult {
1969 region: region.clone(),
1970 status: DeployStatus::Failed(e),
1971 duration: start_time.elapsed(),
1972 details: String::new(),
1973 });
1974 }
1975 }
1976 },
1977 Err(e) => {
1978 error!("❌ 无法初始化区域 {} 的管理器: {}", region, e);
1979 results.push(RegionDeployResult {
1980 region: region.clone(),
1981 status: DeployStatus::Failed(anyhow::anyhow!(e)),
1982 duration: start_time.elapsed(),
1983 details: String::new(),
1984 });
1985 }
1986 }
1987 }
1988
1989 info!("\n{}", "=".repeat(80));
1990 info!("📊 卸载总结");
1991 info!(" ✅ 成功: {}/{} 区域", success_count, self.regions.len());
1992 info!(" ❌ 失败: {}/{} 区域", self.regions.len() - success_count, self.regions.len());
1993
1994 Ok(results)
1995 }
1996
1997 async fn deploy_to_single_region(&self, region: &str, zip_content: &[u8]) -> Result<String> {
1999 let aws_manager = AwsManager::new(region).await?;
2000
2001 aws_manager.deploy_complete_monitoring_stack(zip_content.to_vec()).await?;
2003
2004 aws_manager.fix_ak_sk_auto_disable().await?;
2006
2007 Ok(format!("区域 {} 部署完成", region))
2008 }
2009
2010 async fn print_deployment_summary(&self, results: &[RegionDeployResult], success_count: usize, failed_regions: &[(String, anyhow::Error)]) {
2012 info!("\n{}", "=".repeat(80));
2013 info!("🎉 多区域部署完成总结");
2014 info!("{}", "=".repeat(80).as_str());
2015
2016 info!("📊 部署统计:");
2017 info!(" ✅ 成功区域: {}/{}", success_count, self.regions.len());
2018 info!(" ❌ 失败区域: {}/{}", failed_regions.len(), self.regions.len());
2019 info!(" ⏱️ 总耗时: {:?}",
2020 results.iter().map(|r| r.duration).sum::<std::time::Duration>());
2021
2022 if success_count > 0 {
2023 info!("\n✅ 成功部署的区域:");
2024 for result in results.iter().filter(|r| matches!(r.status, DeployStatus::Success)) {
2025 info!(" 🌍 {} (耗时: {:?})", result.region, result.duration);
2026 }
2027 }
2028
2029 if !failed_regions.is_empty() {
2030 info!("\n❌ 失败的区域:");
2031 for (region, error) in failed_regions {
2032 info!(" 🌍 {}: {}", region, error);
2033 }
2034 }
2035
2036 if success_count > 0 {
2037 info!("\n🚨 429检测和AK/SK自动关闭机制已激活:");
2038 info!(" 🔍 监控范围: {} 个AWS区域", success_count);
2039 info!(" ⚡ 检测速度: 实时检测429错误");
2040 info!(" 🔒 自动响应: 立即禁用相关AK/SK");
2041 info!(" 📊 告警覆盖: 每个区域8个CloudWatch告警");
2042
2043 info!("\n📋 每个区域部署的组件:");
2044 info!(" ✅ Lambda函数: bedrock-monitor-function");
2045 info!(" ✅ SNS主题: bedrock-throttling-alerts");
2046 info!(" ✅ CloudWatch告警: 8个专业告警");
2047 info!(" ✅ EventBridge规则: API调用监控");
2048 info!(" ✅ 触发器配置: SNS+EventBridge");
2049 info!(" ✅ 日志系统: 标准日志+详细日志");
2050 }
2051
2052 info!("\n🔍 验证建议:");
2053 info!(" 1. 检查各区域的CloudWatch告警状态");
2054 info!(" 2. 查看Lambda函数日志输出");
2055 info!(" 3. 监控Bedrock API调用情况");
2056 info!(" 4. 测试429错误响应机制");
2057 info!(" 5. 验证SNS告警通知系统");
2058 }
2059}
2060
2061#[derive(Debug)]
2063pub struct RegionDeployResult {
2064 pub region: String,
2065 pub status: DeployStatus,
2066 pub duration: std::time::Duration,
2067 pub details: String,
2068}
2069
2070#[derive(Debug)]
2072pub enum DeployStatus {
2073 Success,
2074 Failed(anyhow::Error),
2075}
2076
2077impl MultiRegionDeployer {
2079 pub async fn check_deployment_status(&self) -> Result<Vec<RegionStatus>> {
2081 info!("🔍 开始检查所有 {} 个区域的部署状态", self.regions.len());
2082
2083 let mut region_statuses = Vec::new();
2084
2085 for (index, region) in self.regions.iter().enumerate() {
2086 info!("\n📍 [{}/{}] 检查区域: {}", index + 1, self.regions.len(), region);
2087
2088 let status = self.check_single_region_status(region).await;
2089 region_statuses.push(status);
2090
2091 if index < self.regions.len() - 1 {
2093 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
2094 }
2095 }
2096
2097 self.print_status_summary(®ion_statuses).await;
2098 Ok(region_statuses)
2099 }
2100
2101 async fn check_single_region_status(&self, region: &str) -> RegionStatus {
2103 let start_time = std::time::Instant::now();
2104 let mut status = RegionStatus {
2105 region: region.to_string(),
2106 lambda_function: false,
2107 sns_topic: false,
2108 eventbridge_rule: false,
2109 cloudwatch_alarms: 0,
2110 lambda_environment: false,
2111 iam_permissions: false,
2112 total_duration: std::time::Duration::default(),
2113 is_healthy: false,
2114 };
2115
2116 if let Ok(aws_manager) = AwsManager::new(region).await {
2118 status.lambda_function = self.check_lambda_function(&aws_manager).await;
2120
2121 status.sns_topic = self.check_sns_topic(&aws_manager).await;
2123
2124 status.eventbridge_rule = self.check_eventbridge_rule(&aws_manager).await;
2126
2127 status.cloudwatch_alarms = self.count_cloudwatch_alarms(&aws_manager).await;
2129
2130 if status.lambda_function {
2132 status.lambda_environment = self.check_lambda_environment(&aws_manager).await;
2133 }
2134
2135 status.iam_permissions = self.check_iam_permissions(&aws_manager).await;
2137
2138 status.is_healthy = status.lambda_function &&
2140 status.sns_topic &&
2141 status.eventbridge_rule &&
2142 status.cloudwatch_alarms >= 5 && status.lambda_environment &&
2144 status.iam_permissions;
2145 }
2146
2147 status.total_duration = start_time.elapsed();
2148
2149 self.print_region_status(&status).await;
2151
2152 status
2153 }
2154
2155 async fn check_lambda_function(&self, aws_manager: &AwsManager) -> bool {
2157 match aws_manager.lambda_client.get_function_configuration()
2158 .function_name("bedrock-monitor-function")
2159 .send()
2160 .await {
2161 Ok(config) => {
2162 let is_active = matches!(config.state(), Some(aws_sdk_lambda::types::State::Active));
2163 info!(" ✅ Lambda函数: 存在且状态: {:?}", config.state());
2164 is_active
2165 }
2166 Err(_) => {
2167 info!(" ❌ Lambda函数: 不存在或无法访问");
2168 false
2169 }
2170 }
2171 }
2172
2173 async fn check_sns_topic(&self, aws_manager: &AwsManager) -> bool {
2175 match aws_manager.sns_client.list_topics().send().await {
2176 Ok(response) => {
2177 let exists = response.topics()
2178 .iter()
2179 .any(|topic| topic.topic_arn()
2180 .unwrap_or("")
2181 .contains("bedrock-throttling-alerts"));
2182
2183 if exists {
2184 info!(" ✅ SNS主题: bedrock-throttling-alerts 存在");
2185 } else {
2186 info!(" ❌ SNS主题: bedrock-throttling-alerts 不存在");
2187 }
2188 exists
2189 }
2190 Err(_) => {
2191 info!(" ❌ SNS主题: 无法访问");
2192 false
2193 }
2194 }
2195 }
2196
2197 async fn check_eventbridge_rule(&self, aws_manager: &AwsManager) -> bool {
2199 match aws_manager.eventbridge_client.list_rules()
2200 .name_prefix("bedrock-api-monitor-rule")
2201 .send()
2202 .await {
2203 Ok(response) => {
2204 let exists = response.rules()
2205 .iter()
2206 .any(|rule| rule.name() == Some("bedrock-api-monitor-rule"));
2207
2208 if exists {
2209 info!(" ✅ EventBridge规则: bedrock-api-monitor-rule 存在");
2210 } else {
2211 info!(" ❌ EventBridge规则: bedrock-api-monitor-rule 不存在");
2212 }
2213 exists
2214 }
2215 Err(_) => {
2216 info!(" ❌ EventBridge规则: 无法访问");
2217 false
2218 }
2219 }
2220 }
2221
2222 async fn count_cloudwatch_alarms(&self, aws_manager: &AwsManager) -> i32 {
2224 match aws_manager.cloudwatch_client.describe_alarms()
2225 .alarm_name_prefix("bedrock")
2226 .send()
2227 .await {
2228 Ok(response) => {
2229 let count = response.metric_alarms().len() as i32;
2230 info!(" 📊 CloudWatch告警: {} 个", count);
2231 count
2232 }
2233 Err(_) => {
2234 info!(" ❌ CloudWatch告警: 无法访问");
2235 0
2236 }
2237 }
2238 }
2239
2240 async fn check_lambda_environment(&self, aws_manager: &AwsManager) -> bool {
2242 match aws_manager.lambda_client.get_function_configuration()
2243 .function_name("bedrock-monitor-function")
2244 .send()
2245 .await {
2246 Ok(config) => {
2247 if let Some(env) = config.environment() {
2248 if let Some(vars) = env.variables() {
2249 let has_dry_run_false = vars.get("BEDROCK_DRY_RUN_MODE") == Some(&"false".to_string());
2250 let has_auto_disable_true = vars.get("BEDROCK_AUTO_CREDENTIAL_DISABLE") == Some(&"true".to_string());
2251
2252 if has_dry_run_false && has_auto_disable_true {
2253 info!(" ✅ Lambda环境变量: 正确配置 (429检测已启用)");
2254 return true;
2255 } else {
2256 info!(" ⚠️ Lambda环境变量: 配置不完整 (DRY_RUN={}, AUTO_DISABLE={})",
2257 vars.get("BEDROCK_DRY_RUN_MODE").unwrap_or(&"未设置".to_string()),
2258 vars.get("BEDROCK_AUTO_CREDENTIAL_DISABLE").unwrap_or(&"未设置".to_string()));
2259 }
2260 }
2261 }
2262 info!(" ❌ Lambda环境变量: 未正确配置");
2263 false
2264 }
2265 Err(_) => {
2266 info!(" ❌ Lambda环境变量: 无法检查");
2267 false
2268 }
2269 }
2270 }
2271
2272 async fn check_iam_permissions(&self, aws_manager: &AwsManager) -> bool {
2274 match aws_manager.iam_client.get_role()
2275 .role_name("lambda-bedrock-monitor-role")
2276 .send()
2277 .await {
2278 Ok(_) => {
2279 info!(" ✅ IAM权限: lambda-bedrock-monitor-role 存在");
2280 true
2281 }
2282 Err(_) => {
2283 info!(" ❌ IAM权限: lambda-bedrock-monitor-role 不存在");
2284 false
2285 }
2286 }
2287 }
2288
2289 async fn print_region_status(&self, status: &RegionStatus) {
2291 let health_emoji = if status.is_healthy { "✅" } else { "❌" };
2292 info!("{} 区域 {} (耗时: {:?})", health_emoji, status.region, status.total_duration);
2293
2294 info!(" - Lambda函数: {}", if status.lambda_function { "✅" } else { "❌" });
2295 info!(" - SNS主题: {}", if status.sns_topic { "✅" } else { "❌" });
2296 info!(" - EventBridge规则: {}", if status.eventbridge_rule { "✅" } else { "❌" });
2297 info!(" - CloudWatch告警: {} 个 {}", if status.cloudwatch_alarms >= 5 { "✅" } else { "⚠️" }, status.cloudwatch_alarms);
2298 info!(" - 环境变量: {}", if status.lambda_environment { "✅" } else { "❌" });
2299 info!(" - IAM权限: {}", if status.iam_permissions { "✅" } else { "❌" });
2300
2301 if status.is_healthy {
2302 info!(" 🚨 429 throttling限制: ✅ 已激活");
2303 } else {
2304 info!(" 🚨 429 throttling限制: ❌ 未激活");
2305 }
2306 }
2307
2308 async fn print_status_summary(&self, region_statuses: &[RegionStatus]) {
2310 info!("\n{}", "=".repeat(80));
2311 info!("🎉 多区域部署状态检查完成");
2312 info!("{}", "=".repeat(80));
2313
2314 let healthy_count = region_statuses.iter().filter(|r| r.is_healthy).count();
2315 let total_count = region_statuses.len();
2316
2317 info!("📊 部署状态统计:");
2318 info!(" ✅ 完全健康的区域: {}/{}", healthy_count, total_count);
2319 info!(" ❌ 有问题的区域: {}/{}", total_count - healthy_count, total_count);
2320
2321 let lambda_count = region_statuses.iter().filter(|r| r.lambda_function).count();
2323 let sns_count = region_statuses.iter().filter(|r| r.sns_topic).count();
2324 let eventbridge_count = region_statuses.iter().filter(|r| r.eventbridge_rule).count();
2325 let env_count = region_statuses.iter().filter(|r| r.lambda_environment).count();
2326 let iam_count = region_statuses.iter().filter(|r| r.iam_permissions).count();
2327
2328 info!("\n📋 组件部署统计:");
2329 info!(" ✅ Lambda函数: {}/{} 区域", lambda_count, total_count);
2330 info!(" ✅ SNS主题: {}/{} 区域", sns_count, total_count);
2331 info!(" ✅ EventBridge规则: {}/{} 区域", eventbridge_count, total_count);
2332 info!(" ✅ Lambda环境变量: {}/{} 区域", env_count, total_count);
2333 info!(" ✅ IAM权限: {}/{} 区域", iam_count, total_count);
2334
2335 if healthy_count > 0 {
2337 info!("\n✅ 完全部署成功的区域:");
2338 for status in region_statuses.iter().filter(|r| r.is_healthy) {
2339 info!(" 🌍 {} (耗时: {:?})", status.region, status.total_duration);
2340 }
2341 }
2342
2343 if healthy_count < total_count {
2345 info!("\n❌ 需要修复的区域:");
2346 for status in region_statuses.iter().filter(|r| !r.is_healthy) {
2347 let issues = vec![
2348 (!status.lambda_function, "Lambda函数"),
2349 (!status.sns_topic, "SNS主题"),
2350 (!status.eventbridge_rule, "EventBridge规则"),
2351 (!status.lambda_environment, "环境变量"),
2352 (!status.iam_permissions, "IAM权限"),
2353 ].into_iter()
2354 .filter_map(|(missing, component)| if missing { Some(component) } else { None })
2355 .collect::<Vec<_>>();
2356
2357 info!(" 🌍 {}: 缺少 {}", status.region, issues.join(", "));
2358 }
2359 }
2360
2361 if healthy_count > 0 {
2362 info!("\n🚨 429 throttling限制状态:");
2363 info!(" ✅ 已激活区域: {}/{}", healthy_count, total_count);
2364 info!(" ⚡ 检测机制: 实时检测429错误");
2365 info!(" 🔒 自动响应: 立即禁用相关AK/SK");
2366
2367 info!("\n💡 验证建议:");
2368 info!(" 1. 在成功部署的区域测试Bedrock API调用");
2369 info!(" 2. 检查CloudWatch告警是否正常触发");
2370 info!(" 3. 验证SNS告警通知系统");
2371 info!(" 4. 测试429错误AK/SK自动禁用功能");
2372 }
2373 }
2374}
2375
2376#[derive(Debug)]
2378pub struct RegionStatus {
2379 pub region: String,
2380 pub lambda_function: bool,
2381 pub sns_topic: bool,
2382 pub eventbridge_rule: bool,
2383 pub cloudwatch_alarms: i32,
2384 pub lambda_environment: bool,
2385 pub iam_permissions: bool,
2386 pub total_duration: std::time::Duration,
2387 pub is_healthy: bool,
2388}
2389
2390pub fn create_lambda_zip() -> Result<Vec<u8>> {
2391 use std::io::{Cursor, Write};
2392 use zip::write::FileOptions;
2393
2394 let mut buffer = Vec::new();
2395 {
2396 let mut zip = zip::ZipWriter::new(Cursor::new(&mut buffer));
2397 let options = FileOptions::default()
2398 .compression_method(zip::CompressionMethod::Deflated);
2399
2400 let python_content = r#"import json
2402import os
2403import time
2404import re
2405import hashlib
2406from datetime import datetime
2407
2408# Redis connection setup (已移除,现在使用CloudWatch日志)
2409# REDIS_HOST = os.environ.get('REDIS_HOST', 'localhost')
2410# REDIS_PORT = int(os.environ.get('REDIS_PORT', 6379))
2411# REDIS_PASSWORD = os.environ.get('REDIS_PASSWORD', None)
2412
2413# CloudWatch Logs setup
2414CLOUDWATCH_LOG_GROUP = '/aws/lambda/bedrock-monitor-function/detailed'
2415CLOUDWATCH_LOG_STREAM = f"detailed-logs-{int(time.time())}"
2416
2417# CloudWatch Logs client
2418try:
2419 import boto3
2420 cloudwatch_logs = boto3.client('logs')
2421 CLOUDWATCH_AVAILABLE = True
2422except ImportError:
2423 CLOUDWATCH_AVAILABLE = False
2424 print("Warning: boto3 not available, CloudWatch logging disabled")
2425
2426def get_log_stream_name():
2427 """Generate unique log stream name"""
2428 return CLOUDWATCH_LOG_STREAM
2429
2430def ensure_log_group_and_stream():
2431 """Ensure log group and stream exist"""
2432 if not CLOUDWATCH_AVAILABLE:
2433 return
2434
2435 try:
2436 # Create log group if it doesn't exist
2437 try:
2438 cloudwatch_logs.create_log_group(logGroupName=CLOUDWATCH_LOG_GROUP)
2439 print(f"Created CloudWatch log group: {CLOUDWATCH_LOG_GROUP}")
2440 except cloudwatch_logs.exceptions.ResourceAlreadyExistsException:
2441 pass # Log group already exists
2442
2443 # Create log stream
2444 log_stream_name = get_log_stream_name()
2445 try:
2446 cloudwatch_logs.create_log_stream(
2447 logGroupName=CLOUDWATCH_LOG_GROUP,
2448 logStreamName=log_stream_name
2449 )
2450 print(f"Created CloudWatch log stream: {log_stream_name}")
2451 except cloudwatch_logs.exceptions.ResourceAlreadyExistsException:
2452 pass # Log stream already exists
2453
2454 except Exception as e:
2455 print(f"Error setting up CloudWatch Logs: {e}")
2456
2457
2458def sanitize_data(data, level='partial'):
2459 """Sanitize sensitive data from request/response content"""
2460 if not isinstance(data, (dict, str)):
2461 return data
2462
2463 if isinstance(data, str):
2464 # Remove potential API keys, tokens, and sensitive patterns
2465 sanitized = data
2466
2467 # Remove AWS access keys
2468 sanitized = re.sub(r'AKIA[0-9A-Z]{16}', 'AKIAXXXXXXXXXXXXXXXX', sanitized)
2469
2470 # Remove potential tokens and secrets
2471 sanitized = re.sub(r'[Bb]earer\s+[A-Za-z0-9\-._~+\/]+=*', 'bearer XXXXXXXX', sanitized)
2472 sanitized = re.sub(r'[Tt]oken\s*[:=]\s*[A-Za-z0-9\-._~+\/]+=*', 'token: XXXXXXXX', sanitized)
2473
2474 # Remove email addresses
2475 sanitized = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 'email@domain.com', sanitized)
2476
2477 # Remove phone numbers (basic pattern)
2478 sanitized = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', 'XXX-XXX-XXXX', sanitized)
2479
2480 # Remove credit card numbers (basic pattern)
2481 sanitized = re.sub(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', 'XXXX-XXXX-XXXX-XXXX', sanitized)
2482
2483 # Remove IP addresses for privacy (optional, based on level)
2484 if level == 'full':
2485 sanitized = re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', 'X.X.X.X', sanitized)
2486
2487 return sanitized
2488
2489 if isinstance(data, dict):
2490 sanitized_dict = {}
2491 for key, value in data.items():
2492 # Check for sensitive key patterns
2493 if any(pattern in key.lower() for pattern in
2494 ['password', 'token', 'key', 'secret', 'auth', 'credential', 'session']):
2495 sanitized_dict[key] = 'XXXXXXXX'
2496 elif any(pattern in key.lower() for pattern in ['email', 'mail']):
2497 sanitized_dict[key] = 'email@domain.com'
2498 elif key.lower() in ['client_ip', 'source_ip'] and level == 'full':
2499 sanitized_dict[key] = 'X.X.X.X'
2500 else:
2501 sanitized_dict[key] = sanitize_data(value, level)
2502 return sanitized_dict
2503
2504 return data
2505
2506def should_log_detailed_content(status_code, event_type='unknown'):
2507 """Determine if we should log detailed content based on status and event type"""
2508 # Always log errors and throttling in detail
2509 if status_code in [429, 500, 502, 503, 504]:
2510 return 'full'
2511
2512 # Log SNS alarms in detail
2513 if event_type == 'CloudWatchAlarm':
2514 return 'full'
2515
2516 # For successful requests, log summary only
2517 if status_code == 200:
2518 return 'summary'
2519
2520 # Default to partial for other cases
2521 return 'partial'
2522
2523def log_to_cloudwatch_detailed(log_entry, log_level='INFO'):
2524 """Send detailed log entry to CloudWatch Logs"""
2525 if not CLOUDWATCH_AVAILABLE:
2526 print(f"CloudWatch unavailable, logging to console: {log_level} - {json.dumps(log_entry)}")
2527 return
2528
2529 try:
2530 timestamp = int(time.time() * 1000)
2531
2532 log_event = {
2533 'timestamp': timestamp,
2534 'message': json.dumps(log_entry, default=str)
2535 }
2536
2537 cloudwatch_logs.put_log_events(
2538 logGroupName=CLOUDWATCH_LOG_GROUP,
2539 logStreamName=get_log_stream_name(),
2540 logEvents=[log_event]
2541 )
2542
2543 except Exception as e:
2544 print(f"Failed to send to CloudWatch Logs: {e}")
2545 # Fallback to console logging
2546 print(f"{log_level} - {json.dumps(log_entry, default=str)}")
2547
2548def create_detailed_log_entry(event_data, request_data, response_data, status_code, log_level='summary'):
2549 """Create a detailed log entry with appropriate level of detail"""
2550 timestamp = datetime.now().isoformat()
2551
2552 # Base log entry with always-included fields
2553 log_entry = {
2554 'timestamp': timestamp,
2555 'status_code': status_code,
2556 'log_level': log_level,
2557 'request_id': request_data.get('request_id', 'unknown'),
2558 'model': request_data.get('model', 'unknown'),
2559 'client_ip': request_data.get('client_ip', 'unknown'),
2560 'aws_region': request_data.get('aws_region', 'unknown'),
2561 'event_type': event_data.get('event_type', 'unknown')
2562 }
2563
2564 # Add detailed information based on log level
2565 if log_level == 'full':
2566 # Include complete sanitized data
2567 log_entry.update({
2568 'full_request': sanitize_data(request_data, 'partial'),
2569 'full_response': sanitize_data(response_data, 'partial'),
2570 'error_details': request_data.get('error_details', {}),
2571 'user_agent': request_data.get('user_agent', 'unknown'),
2572 'caller_identity': request_data.get('caller_identity', {}),
2573 'request_parameters': sanitize_data(request_data.get('request_parameters', {}), 'partial'),
2574 'response_elements': sanitize_data(response_data.get('response_elements', {}), 'partial')
2575 })
2576 elif log_level == 'summary':
2577 # Include summary information only
2578 log_entry.update({
2579 'api_action': request_data.get('api_action', 'unknown'),
2580 'processing_time_ms': request_data.get('processing_time_ms', 0),
2581 'content_type': response_data.get('content_type', 'unknown'),
2582 'error_code': request_data.get('error_code', None)
2583 })
2584 else: # partial
2585 # Include some detail but sanitize heavily
2586 log_entry.update({
2587 'api_action': request_data.get('api_action', 'unknown'),
2588 'partial_request': sanitize_data(request_data, 'full'),
2589 'error_code': request_data.get('error_code', None)
2590 })
2591
2592 return log_entry
2593
2594def handle_throttle_event(request_data):
2595 """处理429错误事件,立即禁用AK/SK(简化版,无Redis依赖)"""
2596 try:
2597 import boto3
2598 import os
2599
2600 # 检查是否为试运行模式
2601 dry_run_mode = os.environ.get('BEDROCK_DRY_RUN_MODE', 'false').lower() == 'true'
2602
2603 # 提取关键信息
2604 timestamp = request_data.get('timestamp', int(datetime.now().timestamp()))
2605 request_id = request_data.get('request_id', 'unknown')
2606 model_id = request_data.get('model', 'unknown')
2607 client_ip = request_data.get('client_ip', 'unknown')
2608 aws_region = request_data.get('aws_region', 'unknown')
2609
2610 # 从调用者身份中提取AK/SK信息
2611 caller_identity = request_data.get('caller_identity', {})
2612 username = caller_identity.get('userName', 'unknown')
2613 access_key_id = caller_identity.get('accessKeyId', 'unknown')
2614
2615 # 如果没有找到AK/SK,尝试从事件详情中获取
2616 if not access_key_id or access_key_id == 'unknown':
2617 event_details = request_data.get('event_details', {})
2618 access_key_id = event_details.get('accessKeyId', 'unknown')
2619 if not access_key_id or access_key_id == 'unknown':
2620 username = event_details.get('userName', 'unknown')
2621 access_key_id = event_details.get('accessKeyId', 'unknown')
2622
2623 print(f"🚨 429错误检测:")
2624 print(f" - 时间: {datetime.fromtimestamp(timestamp).isoformat()}")
2625 print(f" - 用户: {username}")
2626 print(f" - AK/SK: {access_key_id}")
2627 print(f" - 模型: {model_id}")
2628 print(f" - 源IP: {client_ip}")
2629 print(f" - 区域: {aws_region}")
2630 print(f" - 模式: {'试运行' if dry_run_mode else '正式运行'}")
2631
2632 # 创建详细的事件记录
2633 disable_record = {
2634 "timestamp": datetime.fromtimestamp(timestamp).isoformat(),
2635 "username": username,
2636 "access_key_id": access_key_id,
2637 "model_id": model_id,
2638 "source_ip": client_ip,
2639 "request_id": request_id,
2640 "aws_region": aws_region,
2641 "event_type": "throttle_immediate_disable",
2642 "dry_run": dry_run_mode,
2643 "action_reason": "429_rate_limit_error"
2644 }
2645
2646 # 记录到CloudWatch日志
2647 print(f"📝 429错误记录: {json.dumps(disable_record)}")
2648
2649 # 如果无法获取必要信息,跳过禁用操作
2650 if access_key_id == 'unknown' or username == 'unknown':
2651 print(f"⚠️ 无法获取有效的用户或AK/SK信息,跳过禁用操作")
2652 return False
2653
2654 # 立即禁用AK/SK(无需Redis检查)
2655 if not dry_run_mode:
2656 try:
2657 # 初始化IAM客户端
2658 iam_client = boto3.client('iam')
2659
2660 # 调用IAM禁用访问密钥
2661 response = iam_client.update_access_key(
2662 UserName=username,
2663 AccessKeyId=access_key_id,
2664 Status='Inactive'
2665 )
2666
2667 print(f"✅ 成功禁用AK/SK: {access_key_id}")
2668 print(f"🔒 用户 {username} 的访问密钥已被禁用")
2669
2670 # 更新记录
2671 disable_record.update({
2672 "action": "access_key_disabled",
2673 "status": "success",
2674 "iam_response": str(response)
2675 })
2676
2677 except Exception as iam_error:
2678 print(f"❌ IAM禁用AK/SK失败: {iam_error}")
2679
2680 # 记录失败信息
2681 disable_record.update({
2682 "action": "access_key_disable_failed",
2683 "status": "error",
2684 "error": str(iam_error)
2685 })
2686
2687 return False
2688 else:
2689 print(f"🧪 试运行模式:AK/SK {access_key_id} 将被禁用(未实际执行)")
2690 disable_record["action"] = "dry_run_would_disable"
2691
2692 # 最终记录到CloudWatch
2693 print(f"📋 最终操作记录: {json.dumps(disable_record)}")
2694
2695 return True
2696
2697 except Exception as e:
2698 print(f"❌ 处理429错误时发生异常: {e}")
2699 return False
2700
2701
2702def record_detailed_request(request_data):
2703 """记录详细的请求信息到CloudWatch日志(简化版,无Redis依赖)"""
2704 try:
2705 timestamp = request_data.get('timestamp', int(datetime.now().timestamp()))
2706 status = request_data.get('status_code', 0)
2707 model = request_data.get('model', 'unknown')
2708 client_ip = request_data.get('client_ip', 'unknown')
2709 request_id = request_data.get('request_id', 'unknown')
2710
2711 # 创建日志记录
2712 log_entry = {
2713 "timestamp": datetime.fromtimestamp(timestamp).isoformat(),
2714 "status_code": status,
2715 "model": model,
2716 "client_ip": client_ip,
2717 "request_id": request_id,
2718 "api_action": request_data.get('api_action', 'unknown'),
2719 "aws_region": request_data.get('aws_region', 'unknown'),
2720 "user_agent": request_data.get('user_agent', 'unknown'),
2721 "log_type": "bedrock_api_call"
2722 }
2723
2724 # 记录到CloudWatch日志
2725 print(f"📊 Bedrock API调用记录: {json.dumps(log_entry)}")
2726
2727 # 如果是429错误,立即处理AK/SK禁用
2728 if status == 429:
2729 handle_throttle_event(request_data)
2730
2731 return True
2732
2733 except Exception as e:
2734 print(f"❌ 记录请求信息失败: {e}")
2735 return False
2736
2737def record_basic_status(status_code, timestamp):
2738 """记录基本状态到CloudWatch日志(替代Redis记录)"""
2739 try:
2740 # 记录基本计数到CloudWatch日志
2741 log_entry = {
2742 "timestamp": datetime.fromtimestamp(timestamp).isoformat(),
2743 "status_code": status_code,
2744 "log_type": "bedrock_status_count"
2745 }
2746
2747 print(f"📈 Bedrock状态计数: {json.dumps(log_entry)}")
2748 return True
2749
2750 except Exception as e:
2751 print(f"❌ 记录状态失败: {e}")
2752 return False
2753
2754def extract_bedrock_details_from_event(event):
2755 """从EventBridge CloudTrail事件中提取详细的请求和响应信息"""
2756 try:
2757 if 'detail' not in event:
2758 return None
2759
2760 detail = event['detail']
2761
2762 # 提取请求信息
2763 request_params = detail.get('requestParameters', {})
2764 response_elements = detail.get('responseElements', {})
2765 user_identity = detail.get('userIdentity', {})
2766
2767 # 增强的429错误检测逻辑 - 处理responseElements可能为None的情况
2768 status_code = response_elements.get('httpStatusCode', 0) if response_elements else 0
2769 error_code = detail.get('errorCode', '')
2770 response_error = response_elements.get('error', '') if response_elements else ''
2771
2772 # 检查多个字段判断是否为429错误
2773 is_throttled = (
2774 status_code == 429 or
2775 error_code in ['ThrottlingException', 'ServiceQuotaExceededException'] or
2776 (isinstance(response_error, dict) and response_error.get('code') in ['ThrottlingException', 'ServiceQuotaExceededException']) or
2777 (isinstance(response_error, str) and ('ThrottlingException' in response_error or 'ServiceQuotaExceededException' in response_error))
2778 )
2779
2780 # 如果检测到429相关错误,强制设置status_code为429
2781 if is_throttled:
2782 status_code = 429
2783 print(f"🚨 检测到429/限流错误: status={status_code}, errorCode={error_code}, responseError={response_error}")
2784
2785 request_data = {
2786 'timestamp': int(datetime.now().timestamp()),
2787 'event_time': detail.get('eventTime', ''),
2788 'request_id': response_elements.get('requestId', detail.get('requestID', 'unknown')) if response_elements else detail.get('requestID', 'unknown'),
2789 'model': request_params.get('modelId', 'unknown'),
2790 'api_action': detail.get('eventName', 'unknown'),
2791 'aws_region': detail.get('awsRegion', ''),
2792 'status_code': status_code, # 使用增强检测后的状态码
2793 'client_ip': detail.get('sourceIPAddress', 'unknown'),
2794 'user_agent': detail.get('userAgent', 'unknown'),
2795 'error_code': error_code,
2796 'content_type': response_elements.get('contentType', None) if response_elements else None,
2797 'caller_identity': {
2798 'account_id': user_identity.get('accountId', ''),
2799 'principal_id': user_identity.get('principalId', ''),
2800 'type': user_identity.get('type', ''),
2801 'arn': user_identity.get('arn', ''),
2802 'userName': user_identity.get('userName', ''),
2803 'accessKeyId': user_identity.get('accessKeyId', '')
2804 },
2805 'request_parameters': {
2806 'model_id': request_params.get('modelId', ''),
2807 # 注意:实际请求内容不会记录在CloudTrail中,只记录参数
2808 },
2809 'response_elements': {
2810 'http_status': response_elements.get('httpStatusCode', 0) if response_elements else 0,
2811 'content_type': response_elements.get('contentType', '') if response_elements else '',
2812 'request_id': response_elements.get('requestId', '') if response_elements else '',
2813 'error': response_elements.get('error', None) if response_elements else None
2814 },
2815 'raw_error_code': error_code,
2816 'raw_response_error': response_error,
2817 'is_throttled': is_throttled
2818 }
2819
2820 print(f"📋 提取的请求数据: status_code={status_code}, is_throttled={is_throttled}")
2821 print(f"📊 调试信息: errorCode={error_code}, responseError={response_error}")
2822
2823 return request_data
2824
2825 except Exception as e:
2826 print(f"❌ 从EventBridge事件提取详情时出错: {e}")
2827 return None
2828
2829def extract_bedrock_status_from_event(event):
2830 """Extract status code from EventBridge CloudTrail event (backwards compatibility)"""
2831 try:
2832 request_data = extract_bedrock_details_from_event(event)
2833 if request_data:
2834 return request_data['status_code']
2835 return None
2836 except Exception as e:
2837 print(f"Error extracting status from EventBridge event: {e}")
2838 return 200
2839
2840def handle_sns_alarm(event):
2841 """Handle CloudWatch alarm notifications from SNS"""
2842 try:
2843 print(f"🚨 处理CloudWatch SNS告警事件: {json.dumps(event)}")
2844
2845 # Extract SNS message
2846 if 'Records' not in event:
2847 print("❌ SNS事件中没有找到记录")
2848 return None
2849
2850 alarm_count = 0
2851 for record in event['Records']:
2852 if 'Sns' not in record:
2853 continue
2854
2855 sns_message = json.loads(record['Sns']['Message'])
2856 print(f"📋 SNS消息: {json.dumps(sns_message)}")
2857
2858 # Extract CloudWatch alarm information
2859 alarm_name = sns_message.get('AlarmName', 'unknown')
2860 alarm_arn = sns_message.get('AlarmArn', 'unknown')
2861 state_value = sns_message.get('NewStateValue', 'unknown')
2862 state_reason = sns_message.get('NewStateReason', 'unknown')
2863
2864 # Check if this is a 429-related alarm
2865 is_429_alarm = (
2866 '429' in alarm_name or
2867 'throttle' in alarm_name.lower() or
2868 'InvocationThrottles' in alarm_name or
2869 'Bedrock' in alarm_name
2870 )
2871
2872 # Extract alarm configuration details
2873 trigger_info = sns_message.get('Trigger', {})
2874 metric_name = trigger_info.get('MetricName', 'unknown')
2875 namespace = trigger_info.get('Namespace', 'unknown')
2876 threshold = trigger_info.get('Threshold', 0)
2877
2878 print(f"🔔 CloudWatch告警详情:")
2879 print(f" - 名称: {alarm_name}")
2880 print(f" - 状态: {state_value}")
2881 print(f" - 原因: {state_reason}")
2882 print(f" - 指标: {metric_name}")
2883 print(f" - 命名空间: {namespace}")
2884 print(f" - 阈值: {threshold}")
2885 print(f" - 是否429相关: {is_429_alarm}")
2886
2887 # Create alarm data for logging
2888 alarm_data = {
2889 'timestamp': datetime.now().isoformat(),
2890 'alarm_name': alarm_name,
2891 'alarm_arn': alarm_arn,
2892 'state_value': state_value,
2893 'state_reason': state_reason,
2894 'region': sns_message.get('Region', os.environ.get('AWS_REGION', '')),
2895 'event_type': 'CloudWatchAlarm',
2896 'is_429_related': is_429_alarm,
2897 'alarm_configuration': {
2898 'metric': metric_name,
2899 'namespace': namespace,
2900 'threshold': threshold,
2901 'comparison_operator': trigger_info.get('ComparisonOperator', 'unknown'),
2902 'evaluation_periods': trigger_info.get('EvaluationPeriods', 0)
2903 },
2904 'dimensions': trigger_info.get('Dimensions', []),
2905 'raw_sns_message': sns_message
2906 }
2907
2908 # Log to CloudWatch
2909 print(f"📝 记录CloudWatch告警: {json.dumps(alarm_data)}")
2910
2911 # If this is a 429 alarm in ALARM state, trigger immediate action
2912 if is_429_alarm and state_value == 'ALARM':
2913 print(f"🚨 检测到429相关告警触发!")
2914 print(f" - 告警名称: {alarm_name}")
2915 print(f" - 阈值: {threshold} 次")
2916 print(f" - 状态: {state_value}")
2917
2918 # Create a mock 429 event to trigger the same logic as direct 429 detection
2919 mock_429_event = {
2920 "timestamp": datetime.now().isoformat(),
2921 "event_type": "SNS_Alarm_429_Detection",
2922 "alarm_name": alarm_name,
2923 "metric_value": threshold,
2924 "threshold": threshold,
2925 "state_reason": state_reason,
2926 "trigger_source": "CloudWatch_Metric_Alarm"
2927 }
2928
2929 # Handle this as a 429 event (will trigger AK/SK disabling)
2930 handle_sns_triggered_429(mock_429_event)
2931
2932 alarm_count += 1
2933
2934 print(f"✅ 成功处理了 {alarm_count} 个SNS告警事件")
2935
2936 return {
2937 'status': 'success',
2938 'message': f'SNS告警处理成功,共处理 {alarm_count} 个告警',
2939 'alarms_processed': alarm_count,
2940 'recorded': True
2941 }
2942
2943 except Exception as e:
2944 print(f"❌ 处理SNS告警时出错: {e}")
2945 return {
2946 'status': 'error',
2947 'message': f'处理SNS告警失败: {str(e)}',
2948 'recorded': False
2949 }
2950
2951def handle_sns_triggered_429(alarm_event):
2952 """处理由SNS告警触发的429检测"""
2953 try:
2954 print(f"🔥 处理SNS告警触发的429事件: {json.dumps(alarm_event)}")
2955
2956 # Since this is a CloudWatch alarm, we don't have specific user/AK/SK info
2957 # But we can still log and potentially implement account-level policies
2958
2959 alarm_name = alarm_event.get('alarm_name', 'unknown')
2960 metric_value = alarm_event.get('metric_value', 0)
2961 threshold = alarm_event.get('threshold', 0)
2962
2963 # Create a log entry
2964 log_entry = {
2965 'timestamp': datetime.now().isoformat(),
2966 'event_type': 'SNS_Alarm_429_Response',
2967 'alarm_name': alarm_name,
2968 'metric_value': metric_value,
2969 'threshold': threshold,
2970 'state_reason': alarm_event.get('state_reason', 'CloudWatch alarm triggered'),
2971 'trigger_source': alarm_event.get('trigger_source', 'CloudWatch'),
2972 'action_taken': 'Alarm logged and monitored',
2973 'recommendation': 'Check recent Bedrock API usage and consider rate limiting'
2974 }
2975
2976 print(f"📝 SNS触发的429日志: {json.dumps(log_entry)}")
2977
2978 # Note: Since SNS alarms don't contain specific AK/SK information,
2979 # we can't disable specific credentials here.
2980 # However, we can implement account-level monitoring and policies
2981
2982 print(f"💡 注意: SNS告警不包含具体的AK/SK信息")
2983 print(f" - 建议检查CloudTrail日志获取具体的用户信息")
2984 print(f" - 考虑实施账户级别的限流策略")
2985
2986 return True
2987
2988 except Exception as e:
2989 print(f"❌ 处理SNS触发的429事件时出错: {e}")
2990 return False
2991
2992
2993def lambda_handler(event, context):
2994 try:
2995 print(f"Received event: {json.dumps(event)}")
2996
2997 # Initialize CloudWatch Logs if available
2998 if CLOUDWATCH_AVAILABLE:
2999 ensure_log_group_and_stream()
3000
3001 # Check if this is an SNS event (CloudWatch alarm)
3002 if 'Records' in event and len(event['Records']) > 0 and 'Sns' in event['Records'][0]:
3003 print("Detected SNS event, processing as CloudWatch alarm")
3004 sns_result = handle_sns_alarm(event)
3005 return {
3006 'statusCode': 200,
3007 'body': json.dumps(sns_result),
3008 'headers': {'Content-Type': 'application/json'}
3009 }
3010
3011 # 尝试从EventBridge事件中提取详细信息
3012 request_data = extract_bedrock_details_from_event(event)
3013 status_code = None
3014
3015 if request_data:
3016 status_code = request_data['status_code']
3017 print(f"Extracted from EventBridge - Status: {status_code}, Model: {request_data['model']}, Client IP: {request_data['client_ip']}")
3018 else:
3019 # 如果不是EventBridge事件,使用直接调用数据
3020 status_code = event.get('status_code', 200)
3021 request_data = {
3022 'timestamp': int(datetime.now().timestamp()),
3023 'request_id': event.get('request_id', context.aws_request_id if context else 'unknown'),
3024 'model': event.get('model_id', 'unknown'),
3025 'api_action': 'DirectInvoke',
3026 'aws_region': os.environ.get('AWS_REGION', ''),
3027 'status_code': status_code,
3028 'client_ip': event.get('client_ip', 'lambda-direct'),
3029 'user_agent': event.get('user_agent', 'lambda-function'),
3030 'error_code': event.get('error', None),
3031 'content_type': None,
3032 'caller_identity': {
3033 'account_id': '',
3034 'principal_id': 'lambda',
3035 'type': 'Lambda',
3036 'arn': context.invoked_function_arn if context else ''
3037 },
3038 'request_parameters': event.get('request_parameters', {}),
3039 'response_elements': event.get('response_elements', {})
3040 }
3041 print(f"Using direct call data - Status: {status_code}, Model: {request_data['model']}")
3042
3043 # Log the status (this goes to CloudWatch)
3044 print(f"Bedrock API Status: {status_code}")
3045
3046 # Determine logging level based on status and event type
3047 event_type = "CloudWatchAlarm" if 'Records' in event and 'Sns' in event.get('Records', [{}])[0] else ("EventBridge" if 'detail' in event else "Direct")
3048 log_level = should_log_detailed_content(status_code, event_type)
3049
3050 # Create response data for logging
3051 response_data = {
3052 'content_type': request_data.get('content_type'),
3053 'response_elements': request_data.get('response_elements', {}),
3054 'processing_time_ms': request_data.get('processing_time_ms', 0)
3055 }
3056
3057 # 记录详细信息(扩展到所有状态码,但使用不同的日志级别)
3058 print(f"Recording information for status {status_code} with log level: {log_level}")
3059
3060 # 记录到CloudWatch日志(简化版,无Redis依赖)
3061 if status_code in [200, 429]:
3062 record_detailed_request(request_data)
3063 # 记录基本计数(替代Redis)
3064 timestamp = int(datetime.now().timestamp())
3065 record_basic_status(status_code, timestamp)
3066 else:
3067 # 记录基本计数
3068 timestamp = int(datetime.now().timestamp())
3069 record_basic_status(status_code, timestamp)
3070
3071 # 🔥 关键修复:检测429错误并禁用AK/SK
3072 if status_code in [429] or request_data.get('error_code') in ['ThrottlingException', 'ServiceQuotaExceededException']:
3073 print(f"🚨 检测到429/Throttling错误!状态码: {status_code}, 错误码: {request_data.get('error_code')}")
3074 try:
3075 handle_throttle_event(request_data)
3076 except Exception as e:
3077 print(f"❌ 处理429事件失败: {e}")
3078 elif status_code == 200 and request_data.get('error_code') in ['ThrottlingException', 'ServiceQuotaExceededException']:
3079 print(f"🚨 检测到状态码200但包含Throttling错误!错误码: {request_data.get('error_code')}")
3080 try:
3081 handle_throttle_event(request_data)
3082 except Exception as e:
3083 print(f"❌ 处理Throttling事件失败: {e}")
3084
3085 # Log to CloudWatch Logs with appropriate detail level
3086 try:
3087 detailed_log_entry = create_detailed_log_entry(
3088 event_data={'event_type': event_type, 'raw_event': sanitize_data(event, 'full')},
3089 request_data=request_data,
3090 response_data=response_data,
3091 status_code=status_code,
3092 log_level=log_level
3093 )
3094
3095 log_to_cloudwatch_detailed(detailed_log_entry, 'INFO' if status_code == 200 else 'ERROR')
3096 print(f"Logged to CloudWatch with level: {log_level}")
3097
3098 except Exception as e:
3099 print(f"Failed to log to CloudWatch: {e}")
3100
3101 # Create appropriate response based on status code
3102 if status_code == 200:
3103 response_body = {
3104 "status": "success",
3105 "message": "Bedrock request completed successfully",
3106 "recorded": True,
3107 "event_type": "EventBridge" if 'detail' in event else "Direct",
3108 "request_id": request_data['request_id'],
3109 "model": request_data['model'],
3110 "client_ip": request_data['client_ip']
3111 }
3112 elif status_code == 429:
3113 response_body = {
3114 "status": "rate_limit",
3115 "message": "Bedrock rate limit exceeded",
3116 "recorded": True,
3117 "event_type": "EventBridge" if 'detail' in event else "Direct",
3118 "request_id": request_data['request_id'],
3119 "model": request_data['model'],
3120 "client_ip": request_data['client_ip']
3121 }
3122 elif status_code == 400:
3123 response_body = {
3124 "status": "bad_request",
3125 "message": "Bedrock bad request",
3126 "recorded": False, # Not recorded per requirement
3127 "event_type": "EventBridge" if 'detail' in event else "Direct"
3128 }
3129 elif status_code == 500:
3130 response_body = {
3131 "status": "server_error",
3132 "message": "Bedrock server error",
3133 "recorded": False, # Not recorded per requirement
3134 "event_type": "EventBridge" if 'detail' in event else "Direct"
3135 }
3136 else:
3137 response_body = {
3138 "status": "processed",
3139 "message": f"Bedrock status {status_code} received",
3140 "recorded": False,
3141 "event_type": "EventBridge" if 'detail' in event else "Direct"
3142 }
3143
3144 return {
3145 'statusCode': 200, # Always return 200 for EventBridge
3146 'body': json.dumps(response_body)
3147 }
3148
3149 except Exception as e:
3150 print(f"Error processing Lambda function: {e}")
3151 return {
3152 'statusCode': 500,
3153 'body': json.dumps({
3154 'status': 'error',
3155 'message': f'Internal server error: {str(e)}',
3156 'recorded': False
3157 })
3158 }
3159"#;
3160
3161 zip.start_file("lambda_function.py", options)?;
3162 zip.write_all(python_content.as_bytes())?;
3163
3164 zip.finish()?;
3165 }
3166
3167 Ok(buffer)
3168}