1use aws_config::BehaviorVersion;
2use aws_sdk_lambda::{
3 types::{Environment, Architecture, Runtime, FunctionCode},
4 primitives::Blob,
5 Client as LambdaClient,
6};
7use aws_sdk_iam::Client as IamClient;
8use aws_sdk_cloudwatch::Client as CloudWatchClient;
9use aws_sdk_eventbridge::Client as EventBridgeClient;
10use aws_sdk_cloudtrail::Client as CloudTrailClient;
11use aws_sdk_s3::Client as S3Client;
12use aws_sdk_sns::Client as SnsClient;
13use aws_types::SdkConfig;
14use std::collections::HashMap;
15use std::time::SystemTime;
16use anyhow::{Result, anyhow};
17use tracing::{info, error, warn};
18use aws_credential_types::Credentials;
19use aws_types::region::Region;
20
21async fn create_aws_config_from_env(aws_region: &str) -> Result<SdkConfig> {
23 dotenv::dotenv().ok();
25
26 let access_key_id = std::env::var("AWS_ACCESS_KEY_ID")
28 .map_err(|_| anyhow!("❌ 错误: AWS_ACCESS_KEY_ID 必须在.env文件中设置"))?;
29 let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY")
30 .map_err(|_| anyhow!("❌ 错误: AWS_SECRET_ACCESS_KEY 必须在.env文件中设置"))?;
31
32 let credentials = Credentials::new(access_key_id, secret_access_key, None, None, "env-file");
34
35 let config = aws_config::defaults(BehaviorVersion::latest())
37 .region(Region::new(aws_region.to_string()))
38 .credentials_provider(credentials)
39 .load()
40 .await;
41
42 validate_credentials(&config).await?;
44
45 info!("✅ AWS凭证已成功从.env文件加载");
46 Ok(config)
47}
48
49async fn validate_credentials(config: &SdkConfig) -> Result<()> {
51 use aws_sdk_sts::Client as StsClient;
52
53 let sts_client = StsClient::new(config);
54
55 match sts_client.get_caller_identity().send().await {
56 Ok(response) => {
57 if let Some(account_id) = response.account() {
58 info!("✅ AWS凭证验证成功 - 账户ID: {}", account_id);
59 }
60 Ok(())
61 }
62 Err(e) => {
63 error!("❌ AWS凭证验证失败: {}", e);
64 Err(anyhow!("AWS凭证无效,请检查.env文件中的凭证配置"))
65 }
66 }
67}
68
69#[derive(Debug)]
70pub struct AwsManager {
71 lambda_client: LambdaClient,
72 iam_client: IamClient,
73 cloudwatch_client: CloudWatchClient,
74 eventbridge_client: EventBridgeClient,
75 cloudtrail_client: CloudTrailClient,
76 s3_client: S3Client,
77 sns_client: SnsClient,
78 region: String,
79 sdk_config: SdkConfig,
80}
81
82impl AwsManager {
83 pub async fn new(aws_region: &str) -> Result<Self> {
84 let config = create_aws_config_from_env(aws_region).await?;
86
87 let lambda_client = LambdaClient::new(&config);
88 let iam_client = IamClient::new(&config);
89 let cloudwatch_client = CloudWatchClient::new(&config);
90 let eventbridge_client = EventBridgeClient::new(&config);
91 let cloudtrail_client = CloudTrailClient::new(&config);
92 let s3_client = S3Client::new(&config);
93 let sns_client = SnsClient::new(&config);
94
95 Ok(AwsManager {
96 lambda_client,
97 iam_client,
98 cloudwatch_client,
99 eventbridge_client,
100 cloudtrail_client,
101 s3_client,
102 sns_client,
103 region: aws_region.to_string(),
104 sdk_config: config,
105 })
106 }
107
108 pub async fn create_lambda_execution_role(&self) -> Result<String> {
109 let role_name = format!("lambda-bedrock-monitor-role-{}",
110 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_secs());
111 self.create_lambda_execution_role_with_name(&role_name).await
112 }
113
114 pub async fn create_lambda_execution_role_with_name(&self, role_name: &str) -> Result<String> {
115 let trust_policy = serde_json::json!({
116 "Version": "2012-10-17",
117 "Statement": [
118 {
119 "Effect": "Allow",
120 "Principal": {
121 "Service": "lambda.amazonaws.com"
122 },
123 "Action": "sts:AssumeRole"
124 }
125 ]
126 });
127
128 info!("创建IAM角色: {}", role_name);
129
130 let create_role_result = self.iam_client.create_role()
131 .role_name(role_name)
132 .assume_role_policy_document(trust_policy.to_string())
133 .send()
134 .await;
135
136 match create_role_result {
137 Ok(result) => {
138 let role_arn = result.role().unwrap().arn().to_string();
139 info!("IAM角色创建成功: {}", role_arn);
140
141 let managed_policy_arns = vec![
142 "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
143 "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole",
144 ];
145
146 for policy_arn in managed_policy_arns {
147 let _ = self.iam_client.attach_role_policy()
148 .role_name(role_name)
149 .policy_arn(policy_arn)
150 .send()
151 .await;
152 info!("附加策略: {}", policy_arn);
153 }
154
155 let cloudwatch_logs_policy = r#"{
157 "Version": "2012-10-17",
158 "Statement": [
159 {
160 "Effect": "Allow",
161 "Action": [
162 "logs:CreateLogGroup",
163 "logs:CreateLogStream",
164 "logs:PutLogEvents",
165 "logs:DescribeLogGroups",
166 "logs:DescribeLogStreams"
167 ],
168 "Resource": "arn:aws:logs:*:*:*"
169 }
170 ]
171 }"#;
172
173 match self.iam_client.put_role_policy()
174 .role_name(role_name)
175 .policy_name("CloudWatchLogsWriteAccess")
176 .policy_document(cloudwatch_logs_policy.to_string())
177 .send()
178 .await {
179 Ok(_) => info!("CloudWatch Logs写权限策略添加成功"),
180 Err(e) => warn!("CloudWatch Logs权限策略添加失败: {}", e)
181 }
182
183 let credential_management_policy = r#"{
185 "Version": "2012-10-17",
186 "Statement": [
187 {
188 "Effect": "Allow",
189 "Action": [
190 "iam:UpdateAccessKey",
191 "iam:ListAccessKeys",
192 "iam:GetAccessKeyLastUsed",
193 "iam:GetUser"
194 ],
195 "Resource": "*"
196 }
197 ]
198 }"#;
199
200 match self.iam_client.put_role_policy()
201 .role_name(role_name)
202 .policy_name("BedrockCredentialManagement")
203 .policy_document(credential_management_policy.to_string())
204 .send()
205 .await {
206 Ok(_) => info!("🔐 凭据管理权限策略添加成功"),
207 Err(e) => warn!("⚠️ 凭据管理权限策略添加失败: {}", e)
208 }
209
210 let sns_notification_policy = r#"{
212 "Version": "2012-10-17",
213 "Statement": [
214 {
215 "Effect": "Allow",
216 "Action": [
217 "sns:Publish"
218 ],
219 "Resource": "arn:aws:sns:*:*:bedrock-throttling-alerts"
220 }
221 ]
222 }"#;
223
224 match self.iam_client.put_role_policy()
225 .role_name(role_name)
226 .policy_name("BedrockSNSNotifications")
227 .policy_document(sns_notification_policy.to_string())
228 .send()
229 .await {
230 Ok(_) => info!("📧 SNS通知权限策略添加成功"),
231 Err(e) => warn!("⚠️ SNS通知权限策略添加失败: {}", e)
232 }
233
234 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
235 Ok(role_arn)
236 }
237 Err(e) => {
238 error!("IAM角色创建失败: {}", e);
239 Err(anyhow!("创建IAM角色失败: {}", e))
240 }
241 }
242 }
243
244 pub async fn deploy_lambda_function(&self, zip_content: Vec<u8>) -> Result<String> {
245 let function_name = "bedrock-monitor-function";
246
247 info!("开始部署Lambda函数: {}", function_name);
248 info!("ZIP包大小: {} bytes", zip_content.len());
249
250 let function_exists = self.lambda_client.get_function()
252 .function_name(function_name)
253 .send()
254 .await.is_ok();
255
256 let role_arn = if !function_exists {
258 let role_name = "lambda-bedrock-monitor-role";
260 match self.iam_client.get_role().role_name(role_name).send().await {
261 Ok(role_result) => {
262 info!("使用现有IAM角色: {}", role_name);
263 role_result.role().unwrap().arn().to_string()
264 }
265 Err(_) => {
266 info!("创建新IAM角色: {}", role_name);
268 self.create_lambda_execution_role_with_name(role_name).await?
269 }
270 }
271 } else {
272 info!("函数已存在,获取现有角色ARN");
273 let function_config = self.lambda_client.get_function_configuration()
275 .function_name(function_name)
276 .send()
277 .await?;
278 function_config.role().unwrap().to_string()
279 };
280
281 let env_vars = HashMap::from([
282 ("RUST_LOG".to_string(), "info".to_string()),
283 ("BEDROCK_AUTO_CREDENTIAL_DISABLE".to_string(), "true".to_string()),
284 ("BEDROCK_DRY_RUN_MODE".to_string(), "false".to_string()),
285 ]);
287
288 let environment = Environment::builder()
289 .set_variables(Some(env_vars))
290 .build();
291
292 let function_arn = if function_exists {
293 let function_config = self.lambda_client.get_function_configuration()
295 .function_name(function_name)
296 .send()
297 .await?;
298
299 let current_runtime = function_config.runtime().unwrap_or(&Runtime::Providedal2);
300
301 if !matches!(current_runtime, Runtime::Python39 | Runtime::Python38 | Runtime::Python37) {
303 info!("检测到运行时变更,重新创建Lambda函数");
304
305 let _ = self.lambda_client.delete_function()
307 .function_name(function_name)
308 .send()
309 .await;
310
311 info!("创建新的Python Lambda函数");
313 let create_function_result = self.lambda_client.create_function()
314 .function_name(function_name)
315 .runtime(Runtime::Python312)
316 .handler("lambda_function.lambda_handler")
317 .code(FunctionCode::builder().zip_file(Blob::new(zip_content)).build())
318 .role(role_arn)
319 .architectures(Architecture::X8664)
320 .timeout(30)
321 .memory_size(128)
322 .environment(environment)
323 .send()
324 .await;
325
326 match create_function_result {
327 Ok(result) => {
328 result.function_arn().unwrap().to_string()
329 }
330 Err(e) => {
331 error!("Lambda函数创建失败: {}", e);
332 error!("错误类型: {:?}", e);
333 error!("完整错误信息: {:?}", e);
334 return Err(anyhow!("创建Lambda函数失败: {}", e));
335 }
336 }
337 } else {
338 info!("更新现有Lambda函数");
340 let update_result = self.lambda_client.update_function_code()
341 .function_name(function_name)
342 .zip_file(Blob::new(zip_content))
343 .send()
344 .await;
345
346 match update_result {
347 Ok(result) => result.function_arn().unwrap().to_string(),
348 Err(e) => {
349 error!("Lambda函数更新失败: {}", e);
350 return Err(anyhow!("更新Lambda函数失败: {}", e));
351 }
352 }
353 }
354 } else {
355 info!("创建新Lambda函数");
356 let max_retries = 3;
357
358 for attempt in 1..=max_retries {
359 let create_function_result = self.lambda_client.create_function()
360 .function_name(function_name)
361 .runtime(Runtime::Python312)
362 .handler("lambda_function.lambda_handler")
363 .code(FunctionCode::builder().zip_file(Blob::new(zip_content.clone())).build())
364 .role(&role_arn)
365 .architectures(Architecture::X8664)
366 .timeout(30)
367 .memory_size(128)
368 .environment(environment.clone())
369 .send()
370 .await;
371
372 match create_function_result {
373 Ok(result) => {
374 let function_arn = result.function_arn().unwrap().to_string();
375 info!("✅ Lambda函数创建成功: {}", function_arn);
376 return Ok(function_arn);
377 }
378 Err(e) => {
379 let error_str = e.to_string();
380 let raw_error = format!("{:?}", e);
381
382 error!("❌ Lambda函数创建失败 (尝试 {}/{}):", attempt, max_retries);
383 error!("┌─ 错误信息: {}", e);
384 error!("├─ 错误字符串: {}", error_str);
385 error!("└─ 原始错误对象: {}", raw_error);
386 error!("┌─ 当前区域: {}", self.region);
387 error!("├─ 函数名称: {}", function_name);
388 error!("├─ IAM角色ARN: {}", role_arn);
389 error!("└─ 账户ID: {}", self.get_account_id().await.unwrap_or_else(|_| "unknown".to_string()));
390
391 if error_str.contains("ResourceConflictException") ||
393 error_str.contains("already exists") ||
394 error_str.contains("Function already exist") {
395 info!("ℹ️ Lambda函数已存在,获取函数ARN");
396 match self.lambda_client.get_function_configuration()
398 .function_name(function_name)
399 .send()
400 .await {
401 Ok(config) => {
402 let function_arn = config.function_arn().unwrap().to_string();
403 info!("✅ 获取已存在的Lambda函数: {}", function_arn);
404 return Ok(function_arn);
405 }
406 Err(get_e) => {
407 warn!("⚠️ 无法获取已存在的Lambda函数ARN: {}", get_e);
408 let account_id = self.get_account_id().await?;
410 let default_arn = format!("arn:aws:lambda:{}:{}:function:{}",
411 self.region, account_id, function_name);
412 return Ok(default_arn);
413 }
414 }
415 }
416
417 if attempt < max_retries {
418 let wait_time = attempt * 5; info!("等待 {} 秒后重试...", wait_time);
420 tokio::time::sleep(tokio::time::Duration::from_secs(wait_time as u64)).await;
421 } else {
422 error!("🔍 详细错误分析:");
423
424 if error_str.contains("service error") {
425 error!("┌─ 错误类型: AWS服务错误");
426 error!("├─ 可能原因:");
427 error!("│ • AWS服务暂时不可用");
428 error!("│ • 网络连接问题");
429 error!("│ • AWS服务延迟");
430 error!("│ • 区域服务暂时中断");
431 error!("├─ 建议解决方案:");
432 error!("│ • 检查网络连接");
433 error!("│ • 稍后重试");
434 error!("│ • 尝试使用其他AWS区域");
435 error!("│ • 检查AWS服务状态页面");
436 error!("└─ AWS状态页面: https://status.aws.amazon.com/");
437 } else if error_str.contains("AccessDeniedException") {
438 error!("┌─ 错误类型: 权限不足");
439 error!("├─ 可能原因:");
440 error!("│ • IAM角色无权限在{}区域创建Lambda函数", self.region);
441 error!("│ • IAM角色跨区域权限限制");
442 error!("├─ IAM角色: {}", role_arn);
443 error!("└─ 建议解决方案:");
444 error!(" • 检查IAM角色{}的跨区域权限", role_arn);
445 error!(" • 为IAM角色添加lambda:*权限");
446 error!(" • 或者使用IAM:CreateFunction权限");
447 } else if error_str.contains("InvalidParameterValueException") {
448 error!("┌─ 错误类型: 参数无效");
449 error!("├─ 可能原因:");
450 error!("│ • 函数名、角色ARN或配置参数格式错误");
451 error!("│ • 不支持的配置值");
452 error!("└─ 建议解决方案:");
453 error!(" • 检查函数名是否符合命名规范");
454 error!(" • 验证IAM角色ARN格式");
455 error!(" • 检查运行时和配置参数");
456 } else if error_str.contains("LimitExceededException") {
457 error!("┌─ 错误类型: 资源限制");
458 error!("├─ 可能原因:");
459 error!("│ • 账户Lambda函数数量超限");
460 error!("│ • 并发创建限制");
461 error!("└─ 建议解决方案:");
462 error!(" • 删除不需要的Lambda函数");
463 error!(" • 提高AWS服务配额");
464 } else {
465 error!("┌─ 错误类型: 未知错误");
466 error!("└─ 建议联系AWS技术支持");
467 }
468
469 return Err(anyhow!("创建Lambda函数失败: {}", e));
470 }
471 }
472 }
473 }
474
475 return Err(anyhow!("Lambda函数创建失败: 重试{}次后仍然失败", max_retries));
476 };
477
478 info!("Lambda函数部署成功: {}", function_arn);
479 Ok(function_arn)
480 }
481
482 pub async fn create_cloudwatch_alarm(&self, function_name: &str) -> Result<()> {
483 let alarm_name = format!("{}-error-alarm", function_name);
484
485 info!("创建CloudWatch告警: {}", alarm_name);
486
487 let _account_id = self.get_account_id().await?;
488
489 let dimension1 = aws_sdk_cloudwatch::types::Dimension::builder()
490 .name("FunctionName")
491 .value(function_name)
492 .build();
493
494 let _ = self.cloudwatch_client.put_metric_alarm()
495 .alarm_name(&alarm_name)
496 .alarm_description("Lambda函数错误率告警")
497 .namespace("AWS/Lambda")
498 .metric_name("Errors")
499 .dimensions(dimension1)
500 .statistic(aws_sdk_cloudwatch::types::Statistic::Sum)
501 .period(300)
502 .evaluation_periods(1)
503 .threshold(1.0)
504 .comparison_operator(aws_sdk_cloudwatch::types::ComparisonOperator::GreaterThanOrEqualToThreshold)
505 .send()
506 .await;
507
508 info!("CloudWatch告警创建成功");
509 Ok(())
510 }
511
512 pub async fn create_bedrock_eventbridge_rule(&self) -> Result<String> {
513 let rule_name = "bedrock-api-monitor-rule";
514 let max_retries = 3;
515
516 info!("创建EventBridge规则: {}", rule_name);
517
518 match self.eventbridge_client.list_rules()
520 .name_prefix(rule_name)
521 .send()
522 .await {
523 Ok(response) => {
524 let rules = response.rules();
525 for rule in rules {
526 if let Some(name) = rule.name() {
527 if name == rule_name {
528 let rule_arn = rule.arn().unwrap().to_string();
529 info!("✅ EventBridge规则已存在: {}", rule_arn);
530 return Ok(rule_arn);
531 }
532 }
533 }
534 }
535 Err(e) => {
536 warn!("⚠️ 无法检查EventBridge规则是否已存在: {},继续尝试创建", e);
537 }
538 }
539
540 let event_pattern = r#"{
542 "source": ["aws.bedrock"],
543 "detail-type": ["AWS API Call via CloudTrail"],
544 "detail": {
545 "eventSource": ["bedrock.amazonaws.com"],
546 "eventName": ["InvokeModel", "Converse", "InvokeModelWithResponseStream"]
547 }
548 }"#;
549
550 for attempt in 1..=max_retries {
551 let put_rule_result = self.eventbridge_client.put_rule()
552 .name(rule_name)
553 .event_pattern(event_pattern)
554 .state(aws_sdk_eventbridge::types::RuleState::Enabled)
555 .description("专门监控AWS Bedrock invoke-model API调用 - 记录200和429状态")
556 .send()
557 .await;
558
559 match put_rule_result {
560 Ok(result) => {
561 let rule_arn = result.rule_arn().unwrap().to_string();
562 info!("EventBridge规则创建成功: {}", rule_arn);
563 return Ok(rule_arn);
564 }
565 Err(e) => {
566 let error_str = e.to_string();
567 error!("EventBridge规则创建失败 (尝试 {}/{}): {}", attempt, max_retries, e);
568
569 if error_str.contains("ResourceConflictException") ||
571 error_str.contains("already exists") ||
572 error_str.contains("Rule already exists") {
573 info!("ℹ️ EventBridge规则已存在,跳过创建");
574 match self.eventbridge_client.list_rules()
576 .name_prefix(rule_name)
577 .send()
578 .await {
579 Ok(response) => {
580 let rules = response.rules();
581 for rule in rules {
582 if let Some(name) = rule.name() {
583 if name == rule_name {
584 let rule_arn = rule.arn().unwrap().to_string();
585 info!("✅ 获取已存在的EventBridge规则: {}", rule_arn);
586 return Ok(rule_arn);
587 }
588 }
589 }
590 }
591 Err(list_e) => {
592 warn!("⚠️ 无法获取已存在的EventBridge规则ARN: {}", list_e);
593 let account_id = self.get_account_id().await?;
595 let default_arn = format!("arn:aws:events:{}:{}:rule/{}",
596 self.region, account_id, rule_name);
597 return Ok(default_arn);
598 }
599 }
600 }
601
602 if attempt < max_retries {
603 let wait_time = attempt * 3; info!("等待 {} 秒后重试...", wait_time);
605 tokio::time::sleep(tokio::time::Duration::from_secs(wait_time as u64)).await;
606 } else {
607 if error_str.contains("dispatch failure") {
609 error!("❌ EventBridge服务暂时不可用,可能是网络问题或AWS服务延迟");
610 } else if error_str.contains("InternalFailure") {
611 error!("❌ EventBridge内部服务错误");
612 } else if error_str.contains("AccessDeniedException") {
613 error!("❌ 权限不足: 无权限创建EventBridge规则");
614 } else if error_str.contains("LimitExceededException") {
615 error!("❌ EventBridge规则数量超限");
616 } else {
617 error!("❌ 未知EventBridge错误: {}", error_str);
618 }
619
620 return Err(anyhow!("创建EventBridge规则失败: {}", e));
621 }
622 }
623 }
624 }
625
626 Err(anyhow!("EventBridge规则创建失败: 重试{}次后仍然失败", max_retries))
627 }
628
629 pub async fn add_lambda_target_to_eventbridge(&self) -> Result<()> {
630 let rule_name = "bedrock-api-monitor-rule";
631 let function_name = "bedrock-monitor-function";
632
633 info!("为EventBridge规则添加Lambda目标: {} -> {}", rule_name, function_name);
634
635 let function_config = self.lambda_client.get_function_configuration()
637 .function_name(function_name)
638 .send()
639 .await?;
640
641 let function_arn = function_config.function_arn().unwrap();
642
643 let target = aws_sdk_eventbridge::types::Target::builder()
645 .id("1")
646 .arn(function_arn)
647 .build()?;
648
649 let targets_result = self.eventbridge_client.put_targets()
650 .rule(rule_name)
651 .targets(target)
652 .send()
653 .await;
654
655 match targets_result {
656 Ok(_) => {
657 info!("Lambda目标添加成功");
658
659 let eventbridge_permission_exists = match self.lambda_client.get_policy()
661 .function_name(function_name)
662 .send()
663 .await {
664 Ok(policy_result) => {
665 if let Some(policy) = policy_result.policy() {
666 policy.contains("bedrock-eventbridge-invoke") ||
667 policy.contains("EventBridgeInvoke") ||
668 (policy.contains("events.amazonaws.com") &&
669 policy.contains(&format!("arn:aws:events:{}:{}:rule/{}",
670 self.region, self.get_account_id().await?, rule_name)))
671 } else {
672 false
673 }
674 }
675 Err(_) => false
676 };
677
678 let add_permission_result = if eventbridge_permission_exists {
679 None
680 } else {
681 info!("EventBridge权限不存在,开始添加...");
682 Some(self.lambda_client.add_permission()
683 .function_name(function_name)
684 .statement_id("bedrock-eventbridge-invoke")
685 .action("lambda:InvokeFunction")
686 .principal("events.amazonaws.com")
687 .source_arn(format!("arn:aws:events:{}:{}:rule/{}",
688 self.region,
689 self.get_account_id().await?,
690 rule_name
691 ))
692 .send()
693 .await)
694 };
695
696 match add_permission_result {
697 Some(Ok(response)) => {
698 info!("Lambda EventBridge调用权限添加成功");
699 if let Some(statement) = response.statement() {
700 info!("权限声明: {}", statement);
701 }
702 Ok(())
703 }
704 Some(Err(e)) => {
705 let error_str = e.to_string();
706 let raw_error = format!("{:?}", e);
707 error!("Lambda EventBridge调用权限添加失败");
708 error!("错误信息: {}", error_str);
709 error!("原始错误: {}", raw_error);
710
711 if error_str.contains("ResourceConflictException") ||
713 error_str.contains("already exists") ||
714 error_str.contains("The resource-based policy") ||
715 error_str.contains("already has a statement") {
716 info!("Lambda EventBridge调用权限已存在,跳过添加");
717 Ok(())
718 } else if error_str.contains("InvalidParameterValueException") {
719 warn!("⚠️ EventBridge权限参数无效: 检查函数名和规则ARN格式,但继续部署");
720 Ok(()) } else if error_str.contains("AccessDeniedException") {
722 warn!("⚠️ 权限不足: 无权限为Lambda函数添加EventBridge调用权限,但继续部署");
723 Ok(()) } else if error_str.contains("ResourceNotFoundException") {
725 warn!("⚠️ 资源未找到: Lambda函数或EventBridge规则不存在,但继续部署");
726 Ok(()) } else if error_str.contains("service error") {
728 warn!("⚠️ AWS服务错误,但权限可能已成功添加,继续部署");
729 Ok(()) } else {
731 warn!("Lambda EventBridge调用权限添加失败: {},但继续部署", e);
732 Ok(()) }
734 }
735 None => {
736 info!("EventBridge权限已存在,无需添加");
737 Ok(())
738 }
739 }
740 }
741 Err(e) => {
742 error!("Lambda目标添加失败: {}", e);
743 Err(anyhow!("添加Lambda目标失败: {}", e))
744 }
745 }
746 }
747
748 pub async fn test_lambda_function(&self, payload: &str) -> Result<String> {
749 info!("测试Lambda函数调用,payload: {}", payload);
750
751 match self.lambda_client.invoke()
752 .function_name("bedrock-monitor-function")
753 .payload(aws_sdk_lambda::primitives::Blob::new(payload.as_bytes()))
754 .send()
755 .await {
756 Ok(response) => {
757 info!("Lambda调用成功");
758 let mut result = String::new();
759
760 if let Some(payload) = response.payload() {
761 let payload_str = String::from_utf8(payload.as_ref().to_vec())?;
762 info!("响应: {}", payload_str);
763 result.push_str(&format!("Response: {}\n", payload_str));
764 }
765
766 if let Some(log_result) = response.log_result() {
767 info!("日志: {}", log_result);
768 result.push_str(&format!("Logs: {}\n", log_result));
769 }
770
771 if let Some(function_error) = response.function_error() {
772 let error_str = std::str::from_utf8(function_error.as_ref())
773 .unwrap_or("无法解析错误信息");
774 error!("函数错误: {}", error_str);
775 result.push_str(&format!("Error: {}\n", error_str));
776 }
777
778 Ok(result)
779 }
780 Err(e) => {
781 error!("Lambda调用失败: {}", e);
782 Err(anyhow!("Lambda调用失败: {}", e))
783 }
784 }
785 }
786
787 pub async fn create_lambda_function_url(&self) -> Result<String> {
788 let function_name = "bedrock-monitor-function";
789 info!("创建Lambda Function URL for: {}", function_name);
790
791 let _function_config = self.lambda_client.get_function_configuration()
793 .function_name(function_name)
794 .send()
795 .await?;
796
797 let _create_url_config_result = self.lambda_client.create_function_url_config()
799 .function_name(function_name)
800 .auth_type(aws_sdk_lambda::types::FunctionUrlAuthType::None)
801 .invoke_mode(aws_sdk_lambda::types::InvokeMode::Buffered)
802 .send()
803 .await?;
804
805 let account_id = self.get_account_id().await?;
807
808 let _ = self.lambda_client.add_permission()
809 .function_name(function_name)
810 .statement_id("FunctionURLAllowPublicAccess")
811 .action("lambda:InvokeFunctionUrl")
812 .principal("*")
813 .function_url_auth_type(aws_sdk_lambda::types::FunctionUrlAuthType::None)
814 .source_arn(&format!("arn:aws:lambda:{}:{}:function:{}",
815 self.region, account_id, function_name))
816 .send()
817 .await?;
818
819 let function_url = format!(
821 "https://{}.lambda-url.{}.on.aws",
822 function_name, self.region
823 );
824
825 info!("Lambda Function URL创建成功: {}", function_url);
826 Ok(function_url)
827 }
828
829 pub async fn setup_cloudtrail_for_bedrock(&self) -> Result<String> {
830 let trail_name = "bedrock-api-monitoring-trail";
831 let account_id = self.get_account_id().await?;
832 let s3_bucket_name = format!("bedrock-cloudtrail-logs-{}", account_id);
833
834 info!("设置CloudTrail以监控Bedrock API调用: {}", trail_name);
835
836 self.create_s3_bucket_if_not_exists(&s3_bucket_name).await?;
838
839 let create_trail_result = self.cloudtrail_client.create_trail()
841 .name(trail_name)
842 .s3_bucket_name(s3_bucket_name)
843 .include_global_service_events(true)
844 .is_multi_region_trail(true)
845 .enable_log_file_validation(true)
846 .send()
847 .await;
848
849 let trail_arn = match create_trail_result {
850 Ok(result) => {
851 let arn = result.trail_arn().unwrap().to_string();
852 info!("CloudTrail创建成功: {}", arn);
853 arn
854 }
855 Err(e) => {
856 error!("CloudTrail创建失败: {}", e);
857 return Err(anyhow!("创建CloudTrail失败: {}", e));
858 }
859 };
860
861 info!("配置CloudTrail数据事件...");
863 self.configure_cloudtrail_data_events(trail_name).await?;
864
865 let _ = self.cloudtrail_client.start_logging()
867 .name(trail_name)
868 .send()
869 .await?;
870
871 info!("CloudTrail启动成功");
872 Ok(trail_arn)
873 }
874
875 async fn create_s3_bucket_if_not_exists(&self, bucket_name: &str) -> Result<()> {
876 info!("检查S3存储桶: {}", bucket_name);
877
878 match self.s3_client.head_bucket().bucket(bucket_name).send().await {
880 Ok(_) => {
881 info!("S3存储桶已存在: {}", bucket_name);
882 return Ok(());
883 }
884 Err(_) => {
885 info!("创建新的S3存储桶: {}", bucket_name);
886 }
887 }
888
889 let create_bucket_result = self.s3_client.create_bucket()
891 .bucket(bucket_name)
892 .send()
893 .await;
894
895 match create_bucket_result {
896 Ok(_) => {
897 info!("S3存储桶创建成功: {}", bucket_name);
898 }
899 Err(e) => {
900 error!("S3存储桶创建失败: {}", e);
901 return Err(anyhow!("创建S3存储桶失败: {}", e));
902 }
903 }
904
905 Ok(())
906 }
907
908 async fn configure_cloudtrail_data_events(&self, trail_name: &str) -> Result<()> {
909 info!("配置CloudTrail数据事件以监控Bedrock");
910
911 let account_id = self.get_account_id().await?;
913
914 let data_resource = aws_sdk_cloudtrail::types::DataResource::builder()
917 .r#type("AWS::Bedrock::Model")
918 .values(&format!("arn:aws:bedrock:{}:{}:model/*", self.region, account_id))
919 .build();
920
921 let event_selector = aws_sdk_cloudtrail::types::EventSelector::builder()
922 .read_write_type(aws_sdk_cloudtrail::types::ReadWriteType::All)
923 .include_management_events(true)
924 .data_resources(data_resource)
925 .build();
926
927 let _ = self.cloudtrail_client.put_event_selectors()
929 .trail_name(trail_name)
930 .event_selectors(event_selector)
931 .send()
932 .await?;
933
934
935 info!("CloudTrail数据事件配置完成");
936 Ok(())
937 }
938
939 pub async fn add_eventbridge_lambda_permission(&self) -> Result<()> {
940 let function_name = "bedrock-monitor-function";
941 let statement_id = "EventBridgeInvoke";
942 let action = "lambda:InvokeFunction";
943 let principal = "events.amazonaws.com";
944
945 let account_id = self.get_account_id().await?;
946 let source_arn = format!("arn:aws:events:{}:{}:rule/bedrock-api-monitor-rule",
947 self.region, account_id);
948
949 info!("添加EventBridge调用Lambda权限: {} -> {}", source_arn, function_name);
950
951 let result = self.lambda_client.add_permission()
952 .function_name(function_name)
953 .statement_id(statement_id)
954 .action(action)
955 .principal(principal)
956 .source_arn(&source_arn)
957 .send()
958 .await;
959
960 match result {
961 Ok(_) => {
962 info!("EventBridge Lambda权限添加成功");
963 Ok(())
964 }
965 Err(e) => {
966 error!("EventBridge Lambda权限添加失败: {}", e);
967 Err(anyhow!("添加Lambda EventBridge调用权限失败: {}", e))
968 }
969 }
970 }
971
972 async fn get_account_id(&self) -> Result<String> {
973 let sts_client = aws_sdk_sts::Client::new(&self.sdk_config);
974 let identity = sts_client.get_caller_identity().send().await?;
975 Ok(identity.account().unwrap().to_string())
976 }
977
978 pub async fn create_sns_topic(&self) -> Result<String> {
980 let topic_name = "bedrock-throttling-alerts";
981
982 info!("创建SNS主题: {}", topic_name);
983
984 let result = self.sns_client.create_topic()
985 .name(topic_name)
986 .send()
987 .await;
988
989 match result {
990 Ok(response) => {
991 let topic_arn = response.topic_arn().unwrap().to_string();
992 info!("SNS主题创建成功: {}", topic_arn);
993 Ok(topic_arn)
994 }
995 Err(e) => {
996 error!("SNS主题创建失败: {}", e);
997 Err(anyhow!("创建SNS主题失败: {}", e))
998 }
999 }
1000 }
1001
1002 pub async fn create_bedrock_cloudwatch_alarms(&self, sns_topic_arn: &str) -> Result<()> {
1004 info!("开始创建Bedrock监控CloudWatch告警...");
1005
1006 self.create_single_alarm(
1010 "bedrock-InvocationThrottles-Immediate",
1011 "立即响应告警 - 检测到1次429时就触发AK/SK关闭",
1012 "AWS/Bedrock",
1013 "InvocationThrottles",
1014 10, 1, 1.0, Some(sns_topic_arn),
1018 None,
1019 ).await?;
1020
1021 self.create_single_alarm(
1023 "bedrock-InvocationThrottles-HighFrequency",
1024 "高频Bedrock API限流告警 - 当Bedrock API限流频率异常时触发",
1025 "AWS/Bedrock",
1026 "InvocationThrottles",
1027 60, 2, 10.0, Some(sns_topic_arn),
1031 None,
1032 ).await?;
1033
1034 let general_dimension = aws_sdk_cloudwatch::types::Dimension::builder()
1036 .name("ModelId")
1037 .value("*")
1038 .build();
1039
1040 self.create_single_alarm(
1041 "bedrock-InvocationThrottles-General",
1042 "通用Bedrock API限流告警 - 对任何模型限流超过3次时告警",
1043 "AWS/Bedrock",
1044 "InvocationThrottles",
1045 60, 3, 5.0, Some(sns_topic_arn),
1049 Some(vec![general_dimension]),
1050 ).await?;
1051
1052 self.create_model_specific_alarm(
1054 "anthropic.claude-3-5-sonnet-20240620-v1:0",
1055 sns_topic_arn,
1056 ).await?;
1057
1058 self.create_model_specific_alarm(
1060 "anthropic.claude-3-sonnet-20240229-v1:0",
1061 sns_topic_arn,
1062 ).await?;
1063
1064 self.create_model_specific_alarm(
1066 "anthropic.claude-3-5-haiku-20241022-v1:0",
1067 sns_topic_arn,
1068 ).await?;
1069
1070 self.create_single_alarm(
1072 "bedrock-InvocationClientErrors",
1073 "Bedrock客户端错误告警 - 当客户端错误超过5次时告警",
1074 "AWS/Bedrock",
1075 "InvocationClientErrors",
1076 60, 2, 5.0, Some(sns_topic_arn),
1080 None,
1081 ).await?;
1082
1083 self.create_single_alarm(
1085 "bedrock-InvocationServerErrors",
1086 "Bedrock服务器错误告警 - 当服务器错误超过3次时告警",
1087 "AWS/Bedrock",
1088 "InvocationServerErrors",
1089 60, 1, 3.0, Some(sns_topic_arn),
1093 None,
1094 ).await?;
1095
1096 info!("✅ 所有Bedrock监控CloudWatch告警创建完成");
1097 Ok(())
1098 }
1099
1100 async fn create_single_alarm(
1102 &self,
1103 alarm_name: &str,
1104 description: &str,
1105 namespace: &str,
1106 metric_name: &str,
1107 period: i64,
1108 evaluation_periods: i32,
1109 threshold: f64,
1110 sns_topic_arn: Option<&str>,
1111 dimensions: Option<Vec<aws_sdk_cloudwatch::types::Dimension>>,
1112 ) -> Result<()> {
1113 info!("创建CloudWatch告警: {}", alarm_name);
1114
1115 let mut alarm_request = self.cloudwatch_client.put_metric_alarm()
1116 .alarm_name(alarm_name)
1117 .alarm_description(description)
1118 .namespace(namespace)
1119 .metric_name(metric_name)
1120 .statistic(aws_sdk_cloudwatch::types::Statistic::Sum)
1121 .period(period.try_into()?)
1122 .evaluation_periods(evaluation_periods)
1123 .threshold(threshold)
1124 .comparison_operator(aws_sdk_cloudwatch::types::ComparisonOperator::GreaterThanThreshold);
1125
1126 if let Some(topic_arn) = sns_topic_arn {
1128 alarm_request = alarm_request.alarm_actions(topic_arn);
1129 }
1130
1131 if let Some(dims) = dimensions {
1133 for dim in dims {
1134 alarm_request = alarm_request.dimensions(dim);
1135 }
1136 }
1137
1138 match alarm_request.send().await {
1139 Ok(_) => {
1140 info!("CloudWatch告警创建成功: {}", alarm_name);
1141 Ok(())
1142 }
1143 Err(e) => {
1144 error!("CloudWatch告警创建失败 {}: {}", alarm_name, e);
1145 Err(anyhow!("创建CloudWatch告警失败: {}", e))
1146 }
1147 }
1148 }
1149
1150 async fn create_model_specific_alarm(&self, model_id: &str, sns_topic_arn: &str) -> Result<()> {
1152 let alarm_name = format!("bedrock-InvocationThrottles-{}", model_id.replace(':', "-"));
1153 let description = format!("Claude模型 {} 限流告警 - 当模型1分钟内限流超过3次时告警", model_id);
1154
1155 let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
1156 .name("ModelId")
1157 .value(model_id)
1158 .build();
1159
1160 self.create_single_alarm(
1161 &alarm_name,
1162 &description,
1163 "AWS/Bedrock",
1164 "InvocationThrottles",
1165 60, 1, 3.0, Some(sns_topic_arn),
1169 Some(vec![dimension]),
1170 ).await
1171 }
1172
1173 pub async fn configure_sns_lambda_trigger(&self, sns_topic_arn: &str) -> Result<()> {
1175 let function_name = "bedrock-monitor-function";
1176
1177 info!("配置SNS到Lambda的触发器: {} -> {}", sns_topic_arn, function_name);
1178
1179 let function_config = self.lambda_client.get_function_configuration()
1181 .function_name(function_name)
1182 .send()
1183 .await?;
1184
1185 let function_arn = function_config.function_arn().unwrap();
1186
1187 let result = self.sns_client.subscribe()
1189 .topic_arn(sns_topic_arn)
1190 .protocol("lambda")
1191 .endpoint(function_arn)
1192 .return_subscription_arn(true)
1193 .send()
1194 .await;
1195
1196 match result {
1197 Ok(response) => {
1198 let subscription_arn = response.subscription_arn().unwrap().to_string();
1199 info!("SNS订阅创建成功: {}", subscription_arn);
1200
1201 match self.add_sns_lambda_permission(function_name, sns_topic_arn).await {
1203 Ok(_) => info!("SNS Lambda权限添加成功"),
1204 Err(e) => {
1205 warn!("SNS Lambda权限配置失败,但继续部署: {}", e);
1208 }
1209 }
1210
1211 Ok(())
1212 }
1213 Err(e) => {
1214 let error_str = e.to_string();
1215 error!("SNS订阅创建失败: {}", e);
1216
1217 if error_str.contains("already exists") ||
1219 error_str.contains("ResourceConflictException") ||
1220 error_str.contains("Duplicate subscription") ||
1221 error_str.contains("already a subscription") {
1222 info!("SNS订阅已存在,跳过创建");
1223 match self.add_sns_lambda_permission(function_name, sns_topic_arn).await {
1225 Ok(_) => info!("SNS Lambda权限添加成功"),
1226 Err(e) => {
1227 warn!("SNS Lambda权限配置失败,但继续部署: {}", e);
1228 }
1229 }
1230 Ok(())
1231 } else if error_str.contains("dispatch failure") {
1232 warn!("⚠️ SNS订阅可能已存在或网络问题,但继续部署");
1233 match self.add_sns_lambda_permission(function_name, sns_topic_arn).await {
1235 Ok(_) => info!("SNS Lambda权限添加成功"),
1236 Err(e) => {
1237 warn!("SNS Lambda权限配置失败,但继续部署: {}", e);
1238 }
1239 }
1240 Ok(())
1241 } else if error_str.contains("InvalidParameterException") {
1242 error!("❌ SNS订阅参数无效: 检查主题ARN或Lambda函数ARN格式");
1243 Err(anyhow!("SNS订阅参数无效: {}", error_str))
1244 } else if error_str.contains("AccessDeniedException") {
1245 error!("❌ 权限不足: 无权限创建SNS订阅");
1246 Err(anyhow!("权限不足: {}", error_str))
1247 } else if error_str.contains("NotFoundException") {
1248 error!("❌ 资源未找到: SNS主题或Lambda函数不存在");
1249 Err(anyhow!("资源未找到: {}", error_str))
1250 } else {
1251 error!("❌ 未知SNS订阅错误: {}", error_str);
1252 Err(anyhow!("创建SNS订阅失败: {}", e))
1253 }
1254 }
1255 }
1256 }
1257
1258 async fn add_sns_lambda_permission(&self, function_name: &str, sns_topic_arn: &str) -> Result<()> {
1260 let statement_id = "sns-invoke-permission";
1261
1262 info!("添加SNS调用Lambda权限: {} -> {}", sns_topic_arn, function_name);
1263
1264 let sns_permission_exists = match self.lambda_client.get_policy()
1266 .function_name(function_name)
1267 .send()
1268 .await {
1269 Ok(policy_result) => {
1270 if let Some(policy) = policy_result.policy() {
1271 policy.contains("sns-invoke-permission") ||
1272 (policy.contains("sns.amazonaws.com") && policy.contains(sns_topic_arn))
1273 } else {
1274 false
1275 }
1276 }
1277 Err(_) => false
1278 };
1279
1280 if sns_permission_exists {
1281 info!("✅ SNS权限已存在,跳过添加");
1282 return Ok(());
1283 }
1284
1285 let result = self.lambda_client.add_permission()
1286 .function_name(function_name)
1287 .statement_id(statement_id)
1288 .action("lambda:InvokeFunction")
1289 .principal("sns.amazonaws.com")
1290 .source_arn(sns_topic_arn)
1291 .send()
1292 .await;
1293
1294 match result {
1295 Ok(response) => {
1296 info!("SNS Lambda权限添加成功");
1297 if let Some(statement) = response.statement() {
1298 info!("权限声明: {}", statement);
1299 }
1300 Ok(())
1301 }
1302 Err(e) => {
1303 let error_str = e.to_string();
1304 let raw_error = format!("{:?}", e);
1305 error!("SNS Lambda权限添加失败");
1306 error!("错误信息: {}", error_str);
1307 error!("原始错误: {}", raw_error);
1308
1309 match self.lambda_client.get_policy()
1311 .function_name(function_name)
1312 .send()
1313 .await {
1314 Ok(policy_result) => {
1315 if let Some(policy) = policy_result.policy() {
1316 info!("当前Lambda函数策略已存在");
1317 if policy.contains("sns-invoke-permission") {
1318 info!("✅ SNS权限已存在于策略中,无需重复添加");
1319 return Ok(());
1320 }
1321 }
1322 }
1323 Err(_) => {
1324 warn!("无法检查Lambda函数策略状态");
1325 }
1326 }
1327
1328 if error_str.contains("ResourceConflictException") ||
1330 error_str.contains("already exists") ||
1331 error_str.contains("The resource-based policy") ||
1332 error_str.contains("already has a statement") {
1333 info!("SNS Lambda权限已存在,跳过添加");
1334 Ok(())
1335 } else if error_str.contains("InvalidParameterValueException") {
1336 error!("❌ SNS权限参数无效: 检查函数名和SNS主题ARN格式");
1337 Err(anyhow!("SNS权限参数无效: {}", error_str))
1338 } else if error_str.contains("AccessDeniedException") {
1339 error!("❌ 权限不足: 无权限为Lambda函数添加SNS调用权限");
1340 Err(anyhow!("权限不足: {}", error_str))
1341 } else if error_str.contains("ResourceNotFoundException") {
1342 error!("❌ 资源未找到: Lambda函数或SNS主题不存在");
1343 Err(anyhow!("资源未找到: {}", error_str))
1344 } else if error_str.contains("service error") {
1345 warn!("⚠️ AWS服务错误,但权限可能已成功添加,继续部署");
1346 Ok(())
1347 } else {
1348 error!("❌ 未知SNS权限错误: {}", error_str);
1349 Err(anyhow!("添加SNS Lambda权限失败: {}", e))
1350 }
1351 }
1352 }
1353 }
1354
1355 pub async fn deploy_complete_monitoring_stack(&self, zip_content: Vec<u8>) -> Result<()> {
1357 info!("🚀 开始部署完整的Bedrock监控基础设施...");
1358
1359 let function_arn = self.deploy_lambda_function(zip_content).await?;
1361 info!("✅ Lambda函数部署成功: {}", function_arn);
1362
1363 let max_retries = 3;
1365
1366 for attempt in 1..=max_retries {
1367 match self.update_lambda_environment().await {
1368 Ok(_) => {
1369 info!("✅ Lambda环境变量更新成功 (尝试 {})", attempt);
1370 break;
1371 }
1372 Err(e) => {
1373 warn!("⚠️ Lambda环境变量更新失败 (尝试 {}/{}): {}", attempt, max_retries, e);
1374 if attempt < max_retries {
1375 info!("等待 {} 秒后重试...", attempt * 5);
1376 tokio::time::sleep(tokio::time::Duration::from_secs(attempt as u64 * 5)).await;
1377 } else {
1378 error!("❌ Lambda环境变量更新最终失败,但继续部署流程");
1379 }
1380 }
1381 }
1382 }
1383
1384 let sns_topic_arn = self.create_sns_topic().await?;
1386 info!("✅ SNS主题创建成功: {}", sns_topic_arn);
1387
1388 self.create_bedrock_cloudwatch_alarms(&sns_topic_arn).await?;
1390 info!("✅ CloudWatch告警创建成功");
1391
1392 let rule_arn = self.create_bedrock_eventbridge_rule().await?;
1394 info!("✅ EventBridge规则创建成功: {}", rule_arn);
1395
1396 self.add_lambda_target_to_eventbridge().await?;
1398 info!("✅ EventBridge到Lambda触发器配置成功");
1399
1400 self.configure_sns_lambda_trigger(&sns_topic_arn).await?;
1402 info!("✅ SNS到Lambda触发器配置成功");
1403
1404 self.create_cloudwatch_alarm("bedrock-monitor-function").await?;
1406 info!("✅ Lambda错误告警创建成功");
1407
1408 info!("🎉 完整的Bedrock监控基础设施部署成功!");
1409 info!("📊 监控功能包括:");
1410 info!(" - 429错误检测和AK/SK自动关闭");
1411 info!(" - 8个CloudWatch告警");
1412 info!(" - EventBridge API调用监控");
1413 info!(" - SNS告警通知");
1414 info!(" - 实时CloudWatch日志记录");
1415
1416 Ok(())
1417 }
1418
1419 pub async fn update_lambda_environment(&self) -> Result<()> {
1421 let function_name = "bedrock-monitor-function";
1422
1423 info!("更新Lambda函数环境变量: {}", function_name);
1424
1425 match self.lambda_client.get_function_configuration()
1427 .function_name(function_name)
1428 .send()
1429 .await {
1430 Ok(function_config) => {
1431 info!("✅ Lambda函数存在,状态: {:?}", function_config.state());
1432
1433 match function_config.state() {
1435 Some(aws_sdk_lambda::types::State::Pending) => {
1436 warn!("⚠️ Lambda函数正在更新中,等待完成...");
1437 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
1439 }
1440 Some(aws_sdk_lambda::types::State::Active) => {
1441 info!("✅ Lambda函数状态正常");
1442 }
1443 Some(aws_sdk_lambda::types::State::Failed) | Some(aws_sdk_lambda::types::State::Inactive) => {
1444 return Err(anyhow!("❌ Lambda函数状态异常: {:?}", function_config.state()));
1445 }
1446 None => {
1447 warn!("⚠️ 无法获取Lambda函数状态,继续尝试更新");
1448 }
1449 _ => {
1450 warn!("⚠️ Lambda函数处于未知状态: {:?}, 继续尝试更新", function_config.state());
1451 }
1452 }
1453 }
1454 Err(e) => {
1455 error!("❌ 无法获取Lambda函数配置: {}", e);
1456 return Err(anyhow!("Lambda函数不存在或无权限访问: {}", e));
1457 }
1458 }
1459
1460 let env_vars = HashMap::from([
1461 ("RUST_LOG".to_string(), "info".to_string()),
1462 ("BEDROCK_AUTO_CREDENTIAL_DISABLE".to_string(), "true".to_string()),
1463 ("BEDROCK_DRY_RUN_MODE".to_string(), "false".to_string()), ]);
1466
1467 let total_size: usize = env_vars.iter()
1469 .map(|(k, v)| k.len() + v.len() + 2) .sum();
1471
1472 info!("环境变量总大小: {} bytes (限制: 4096 bytes)", total_size);
1473 if total_size > 4096 {
1474 return Err(anyhow!("环境变量总大小超过AWS Lambda限制(4KB): {} bytes", total_size));
1475 }
1476
1477 let environment = Environment::builder()
1478 .set_variables(Some(env_vars))
1479 .build();
1480
1481 info!("正在发送更新请求...");
1482 match self.lambda_client.update_function_configuration()
1483 .function_name(function_name)
1484 .environment(environment)
1485 .send()
1486 .await {
1487 Ok(response) => {
1488 info!("✅ Lambda环境变量更新请求已发送");
1489 info!("函数状态: {:?}", response.state());
1490 info!("最后更新时间: {:?}", response.last_modified());
1491
1492 info!("等待配置更新生效...");
1494 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
1495
1496 match self.lambda_client.get_function_configuration()
1498 .function_name(function_name)
1499 .send()
1500 .await {
1501 Ok(updated_config) => {
1502 if let Some(env) = updated_config.environment() {
1503 if let Some(vars) = env.variables() {
1504 info!("✅ 环境变量验证成功:");
1505 for (key, value) in vars {
1506 info!(" {} = {}", key, value);
1507 }
1508 }
1509 }
1510 info!("✅ Lambda环境变量更新完成");
1511 }
1512 Err(e) => {
1513 warn!("⚠️ 无法验证环境变量更新: {}", e);
1514 }
1515 }
1516
1517 Ok(())
1518 }
1519 Err(e) => {
1520 let error_str = e.to_string();
1521 let raw_error = format!("{:?}", e);
1522
1523 match self.lambda_client.get_function_configuration()
1525 .function_name(function_name)
1526 .send()
1527 .await {
1528 Ok(config) => {
1529 if let Some(env) = config.environment() {
1530 if let Some(vars) = env.variables() {
1531 let has_correct_vars = vars.get("BEDROCK_DRY_RUN_MODE") == Some(&"false".to_string()) &&
1532 vars.get("BEDROCK_AUTO_CREDENTIAL_DISABLE") == Some(&"true".to_string()) &&
1533 vars.get("RUST_LOG") == Some(&"info".to_string());
1534
1535 if has_correct_vars {
1536 info!("✅ Lambda环境变量已正确设置,无需重复更新");
1537 if error_str.contains("ResourceConflictException") {
1538 info!("ℹ️ 检测到Lambda函数正在更新,但环境变量已正确配置");
1539 } else if error_str.contains("service error") {
1540 info!("ℹ️ 检测到AWS服务错误,但环境变量已正确配置");
1541 }
1542 return Ok(());
1543 }
1544 }
1545 }
1546 }
1547 Err(_) => {
1548 warn!("无法验证当前环境变量状态");
1550 }
1551 }
1552
1553 error!("❌ Lambda环境变量更新失败");
1555 error!("错误信息: {}", error_str);
1556 error!("原始错误: {}", raw_error);
1557
1558 if error_str.contains("ResourceConflictException") {
1560 error!("❌ 资源冲突: Lambda函数正在被其他操作更新");
1561 } else if error_str.contains("InvalidParameterValueException") {
1562 error!("❌ 参数无效: 环境变量格式或内容有问题");
1563 } else if error_str.contains("TooManyRequestsException") {
1564 error!("❌ 请求过多: AWS API调用频率超限");
1565 } else if error_str.contains("AccessDeniedException") {
1566 error!("❌ 权限不足: 无权限更新Lambda函数配置");
1567 } else if error_str.contains("ResourceNotFoundException") {
1568 error!("❌ 资源未找到: Lambda函数不存在");
1569 } else if error_str.contains("service error") {
1570 warn!("⚠️ AWS服务错误,可能环境变量已更新,将进行验证");
1571 return Err(anyhow!("更新Lambda环境变量失败: {}", e));
1574 } else {
1575 error!("❌ 未知错误类型: {}", error_str);
1576 }
1577
1578 Err(anyhow!("更新Lambda环境变量失败: {}", e))
1579 }
1580 }
1581 }
1582
1583 pub async fn update_iam_credentials_policy(&self) -> Result<()> {
1585 let role_name = "lambda-bedrock-monitor-role";
1586
1587 info!("更新IAM角色AK/SK管理权限: {}", role_name);
1588
1589 let credential_management_policy = r#"{
1591 "Version": "2012-10-17",
1592 "Statement": [
1593 {
1594 "Effect": "Allow",
1595 "Action": [
1596 "iam:UpdateAccessKey",
1597 "iam:ListAccessKeys",
1598 "iam:GetAccessKeyLastUsed",
1599 "iam:GetUser"
1600 ],
1601 "Resource": "*"
1602 }
1603 ]
1604 }"#;
1605
1606 match self.iam_client.put_role_policy()
1607 .role_name(role_name)
1608 .policy_name("BedrockCredentialManagement")
1609 .policy_document(credential_management_policy.to_string())
1610 .send()
1611 .await {
1612 Ok(_) => {
1613 info!("✅ IAM角色AK/SK管理权限更新成功");
1614 Ok(())
1615 }
1616 Err(e) => {
1617 error!("❌ IAM角色AK/SK管理权限更新失败: {}", e);
1618 Err(anyhow!("更新IAM角色权限失败: {}", e))
1619 }
1620 }
1621 }
1622
1623 pub async fn fix_ak_sk_auto_disable(&self) -> Result<()> {
1625 info!("🔧 开始修复AK/SK自动关闭功能...");
1626
1627 self.update_iam_credentials_policy().await?;
1629 info!("✅ IAM权限已更新支持多区域");
1630
1631 self.update_lambda_environment().await?;
1633 info!("✅ Lambda环境变量已更新(关闭试运行模式)");
1634
1635 info!("🎉 AK/SK自动关闭功能修复完成!");
1636 info!("📊 修复内容:");
1637 info!(" - IAM权限现在支持所有区域");
1638 info!(" - BEDROCK_DRY_RUN_MODE=false(正式运行模式)");
1639 info!(" - BEDROCK_AUTO_CREDENTIAL_DISABLE=true(启用自动禁用)");
1640
1641 Ok(())
1642 }
1643}
1644
1645pub struct MultiRegionDeployer {
1647 regions: Vec<String>,
1648}
1649
1650impl MultiRegionDeployer {
1651 pub fn from_env() -> Result<Self> {
1653 dotenv::dotenv().ok();
1654
1655 let regions_str = std::env::var("AWS_REGION")
1656 .map_err(|_| anyhow!("❌ 错误: AWS_REGION 必须在.env文件中设置"))?;
1657
1658 let regions: Vec<String> = regions_str
1659 .split(',')
1660 .map(|s| s.trim().to_string())
1661 .filter(|s| !s.is_empty())
1662 .collect();
1663
1664 if regions.is_empty() {
1665 return Err(anyhow!("❌ 错误: AWS_REGION 不能为空"));
1666 }
1667
1668 info!("📍 配置的AWS区域: {}", regions.join(", "));
1669 info!("🌍 总计 {} 个区域", regions.len());
1670
1671 Ok(MultiRegionDeployer { regions })
1672 }
1673
1674 pub async fn deploy_to_all_regions(&self) -> Result<Vec<RegionDeployResult>> {
1676 info!("🚀 开始在所有 {} 个区域部署Bedrock监控系统", self.regions.len());
1677
1678 let zip_content = create_lambda_zip()?;
1679 info!("✅ Lambda函数包创建完成,大小: {} bytes", zip_content.len());
1680
1681 let mut results = Vec::new();
1682 let mut success_count = 0;
1683 let mut failed_regions = Vec::new();
1684
1685 for (index, region) in self.regions.iter().enumerate() {
1686 info!("\n📍 [{}/{}] 部署到区域: {}", index + 1, self.regions.len(), region);
1687 info!("{}", "=".repeat(60));
1688
1689 let start_time = std::time::Instant::now();
1690 let result = self.deploy_to_single_region(region, &zip_content).await;
1691 let duration = start_time.elapsed();
1692
1693 match result {
1694 Ok(deploy_result) => {
1695 success_count += 1;
1696 info!("✅ 区域 {} 部署成功 (耗时: {:?})", region, duration);
1697 results.push(RegionDeployResult {
1698 region: region.clone(),
1699 status: DeployStatus::Success,
1700 duration,
1701 details: deploy_result,
1702 });
1703 }
1704 Err(e) => {
1705 failed_regions.push((region.clone(), anyhow::anyhow!("{}", e)));
1706 error!("❌ 区域 {} 部署失败 (耗时: {:?}): {}", region, duration, e);
1707 results.push(RegionDeployResult {
1708 region: region.clone(),
1709 status: DeployStatus::Failed(e),
1710 duration,
1711 details: String::new(),
1712 });
1713 }
1714 }
1715
1716 if index < self.regions.len() - 1 {
1718 info!("⏳ 等待 3 秒后部署下一个区域...");
1719 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1720 }
1721 }
1722
1723 self.print_deployment_summary(&results, success_count, &failed_regions).await;
1725
1726 if success_count == 0 {
1727 return Err(anyhow!("❌ 所有区域部署失败"));
1728 } else if failed_regions.len() > 0 {
1729 warn!("⚠️ {} 个区域部署失败,请检查错误信息", failed_regions.len());
1730 }
1731
1732 Ok(results)
1733 }
1734
1735 async fn deploy_to_single_region(&self, region: &str, zip_content: &[u8]) -> Result<String> {
1737 let aws_manager = AwsManager::new(region).await?;
1738
1739 aws_manager.deploy_complete_monitoring_stack(zip_content.to_vec()).await?;
1741
1742 aws_manager.fix_ak_sk_auto_disable().await?;
1744
1745 Ok(format!("区域 {} 部署完成", region))
1746 }
1747
1748 async fn print_deployment_summary(&self, results: &[RegionDeployResult], success_count: usize, failed_regions: &[(String, anyhow::Error)]) {
1750 info!("\n{}", "=".repeat(80));
1751 info!("🎉 多区域部署完成总结");
1752 info!("{}", "=".repeat(80).as_str());
1753
1754 info!("📊 部署统计:");
1755 info!(" ✅ 成功区域: {}/{}", success_count, self.regions.len());
1756 info!(" ❌ 失败区域: {}/{}", failed_regions.len(), self.regions.len());
1757 info!(" ⏱️ 总耗时: {:?}",
1758 results.iter().map(|r| r.duration).sum::<std::time::Duration>());
1759
1760 if success_count > 0 {
1761 info!("\n✅ 成功部署的区域:");
1762 for result in results.iter().filter(|r| matches!(r.status, DeployStatus::Success)) {
1763 info!(" 🌍 {} (耗时: {:?})", result.region, result.duration);
1764 }
1765 }
1766
1767 if !failed_regions.is_empty() {
1768 info!("\n❌ 失败的区域:");
1769 for (region, error) in failed_regions {
1770 info!(" 🌍 {}: {}", region, error);
1771 }
1772 }
1773
1774 if success_count > 0 {
1775 info!("\n🚨 429检测和AK/SK自动关闭机制已激活:");
1776 info!(" 🔍 监控范围: {} 个AWS区域", success_count);
1777 info!(" ⚡ 检测速度: 实时检测429错误");
1778 info!(" 🔒 自动响应: 立即禁用相关AK/SK");
1779 info!(" 📊 告警覆盖: 每个区域8个CloudWatch告警");
1780
1781 info!("\n📋 每个区域部署的组件:");
1782 info!(" ✅ Lambda函数: bedrock-monitor-function");
1783 info!(" ✅ SNS主题: bedrock-throttling-alerts");
1784 info!(" ✅ CloudWatch告警: 8个专业告警");
1785 info!(" ✅ EventBridge规则: API调用监控");
1786 info!(" ✅ 触发器配置: SNS+EventBridge");
1787 info!(" ✅ 日志系统: 标准日志+详细日志");
1788 }
1789
1790 info!("\n🔍 验证建议:");
1791 info!(" 1. 检查各区域的CloudWatch告警状态");
1792 info!(" 2. 查看Lambda函数日志输出");
1793 info!(" 3. 监控Bedrock API调用情况");
1794 info!(" 4. 测试429错误响应机制");
1795 info!(" 5. 验证SNS告警通知系统");
1796 }
1797}
1798
1799#[derive(Debug)]
1801pub struct RegionDeployResult {
1802 pub region: String,
1803 pub status: DeployStatus,
1804 pub duration: std::time::Duration,
1805 pub details: String,
1806}
1807
1808#[derive(Debug)]
1810pub enum DeployStatus {
1811 Success,
1812 Failed(anyhow::Error),
1813}
1814
1815impl MultiRegionDeployer {
1817 pub async fn check_deployment_status(&self) -> Result<Vec<RegionStatus>> {
1819 info!("🔍 开始检查所有 {} 个区域的部署状态", self.regions.len());
1820
1821 let mut region_statuses = Vec::new();
1822
1823 for (index, region) in self.regions.iter().enumerate() {
1824 info!("\n📍 [{}/{}] 检查区域: {}", index + 1, self.regions.len(), region);
1825
1826 let status = self.check_single_region_status(region).await;
1827 region_statuses.push(status);
1828
1829 if index < self.regions.len() - 1 {
1831 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
1832 }
1833 }
1834
1835 self.print_status_summary(®ion_statuses).await;
1836 Ok(region_statuses)
1837 }
1838
1839 async fn check_single_region_status(&self, region: &str) -> RegionStatus {
1841 let start_time = std::time::Instant::now();
1842 let mut status = RegionStatus {
1843 region: region.to_string(),
1844 lambda_function: false,
1845 sns_topic: false,
1846 eventbridge_rule: false,
1847 cloudwatch_alarms: 0,
1848 lambda_environment: false,
1849 iam_permissions: false,
1850 total_duration: std::time::Duration::default(),
1851 is_healthy: false,
1852 };
1853
1854 if let Ok(aws_manager) = AwsManager::new(region).await {
1856 status.lambda_function = self.check_lambda_function(&aws_manager).await;
1858
1859 status.sns_topic = self.check_sns_topic(&aws_manager).await;
1861
1862 status.eventbridge_rule = self.check_eventbridge_rule(&aws_manager).await;
1864
1865 status.cloudwatch_alarms = self.count_cloudwatch_alarms(&aws_manager).await;
1867
1868 if status.lambda_function {
1870 status.lambda_environment = self.check_lambda_environment(&aws_manager).await;
1871 }
1872
1873 status.iam_permissions = self.check_iam_permissions(&aws_manager).await;
1875
1876 status.is_healthy = status.lambda_function &&
1878 status.sns_topic &&
1879 status.eventbridge_rule &&
1880 status.cloudwatch_alarms >= 5 && status.lambda_environment &&
1882 status.iam_permissions;
1883 }
1884
1885 status.total_duration = start_time.elapsed();
1886
1887 self.print_region_status(&status).await;
1889
1890 status
1891 }
1892
1893 async fn check_lambda_function(&self, aws_manager: &AwsManager) -> bool {
1895 match aws_manager.lambda_client.get_function_configuration()
1896 .function_name("bedrock-monitor-function")
1897 .send()
1898 .await {
1899 Ok(config) => {
1900 let is_active = matches!(config.state(), Some(aws_sdk_lambda::types::State::Active));
1901 info!(" ✅ Lambda函数: 存在且状态: {:?}", config.state());
1902 is_active
1903 }
1904 Err(_) => {
1905 info!(" ❌ Lambda函数: 不存在或无法访问");
1906 false
1907 }
1908 }
1909 }
1910
1911 async fn check_sns_topic(&self, aws_manager: &AwsManager) -> bool {
1913 match aws_manager.sns_client.list_topics().send().await {
1914 Ok(response) => {
1915 let exists = response.topics()
1916 .iter()
1917 .any(|topic| topic.topic_arn()
1918 .unwrap_or("")
1919 .contains("bedrock-throttling-alerts"));
1920
1921 if exists {
1922 info!(" ✅ SNS主题: bedrock-throttling-alerts 存在");
1923 } else {
1924 info!(" ❌ SNS主题: bedrock-throttling-alerts 不存在");
1925 }
1926 exists
1927 }
1928 Err(_) => {
1929 info!(" ❌ SNS主题: 无法访问");
1930 false
1931 }
1932 }
1933 }
1934
1935 async fn check_eventbridge_rule(&self, aws_manager: &AwsManager) -> bool {
1937 match aws_manager.eventbridge_client.list_rules()
1938 .name_prefix("bedrock-api-monitor-rule")
1939 .send()
1940 .await {
1941 Ok(response) => {
1942 let exists = response.rules()
1943 .iter()
1944 .any(|rule| rule.name() == Some("bedrock-api-monitor-rule"));
1945
1946 if exists {
1947 info!(" ✅ EventBridge规则: bedrock-api-monitor-rule 存在");
1948 } else {
1949 info!(" ❌ EventBridge规则: bedrock-api-monitor-rule 不存在");
1950 }
1951 exists
1952 }
1953 Err(_) => {
1954 info!(" ❌ EventBridge规则: 无法访问");
1955 false
1956 }
1957 }
1958 }
1959
1960 async fn count_cloudwatch_alarms(&self, aws_manager: &AwsManager) -> i32 {
1962 match aws_manager.cloudwatch_client.describe_alarms()
1963 .alarm_name_prefix("bedrock")
1964 .send()
1965 .await {
1966 Ok(response) => {
1967 let count = response.metric_alarms().len() as i32;
1968 info!(" 📊 CloudWatch告警: {} 个", count);
1969 count
1970 }
1971 Err(_) => {
1972 info!(" ❌ CloudWatch告警: 无法访问");
1973 0
1974 }
1975 }
1976 }
1977
1978 async fn check_lambda_environment(&self, aws_manager: &AwsManager) -> bool {
1980 match aws_manager.lambda_client.get_function_configuration()
1981 .function_name("bedrock-monitor-function")
1982 .send()
1983 .await {
1984 Ok(config) => {
1985 if let Some(env) = config.environment() {
1986 if let Some(vars) = env.variables() {
1987 let has_dry_run_false = vars.get("BEDROCK_DRY_RUN_MODE") == Some(&"false".to_string());
1988 let has_auto_disable_true = vars.get("BEDROCK_AUTO_CREDENTIAL_DISABLE") == Some(&"true".to_string());
1989
1990 if has_dry_run_false && has_auto_disable_true {
1991 info!(" ✅ Lambda环境变量: 正确配置 (429检测已启用)");
1992 return true;
1993 } else {
1994 info!(" ⚠️ Lambda环境变量: 配置不完整 (DRY_RUN={}, AUTO_DISABLE={})",
1995 vars.get("BEDROCK_DRY_RUN_MODE").unwrap_or(&"未设置".to_string()),
1996 vars.get("BEDROCK_AUTO_CREDENTIAL_DISABLE").unwrap_or(&"未设置".to_string()));
1997 }
1998 }
1999 }
2000 info!(" ❌ Lambda环境变量: 未正确配置");
2001 false
2002 }
2003 Err(_) => {
2004 info!(" ❌ Lambda环境变量: 无法检查");
2005 false
2006 }
2007 }
2008 }
2009
2010 async fn check_iam_permissions(&self, aws_manager: &AwsManager) -> bool {
2012 match aws_manager.iam_client.get_role()
2013 .role_name("lambda-bedrock-monitor-role")
2014 .send()
2015 .await {
2016 Ok(_) => {
2017 info!(" ✅ IAM权限: lambda-bedrock-monitor-role 存在");
2018 true
2019 }
2020 Err(_) => {
2021 info!(" ❌ IAM权限: lambda-bedrock-monitor-role 不存在");
2022 false
2023 }
2024 }
2025 }
2026
2027 async fn print_region_status(&self, status: &RegionStatus) {
2029 let health_emoji = if status.is_healthy { "✅" } else { "❌" };
2030 info!("{} 区域 {} (耗时: {:?})", health_emoji, status.region, status.total_duration);
2031
2032 info!(" - Lambda函数: {}", if status.lambda_function { "✅" } else { "❌" });
2033 info!(" - SNS主题: {}", if status.sns_topic { "✅" } else { "❌" });
2034 info!(" - EventBridge规则: {}", if status.eventbridge_rule { "✅" } else { "❌" });
2035 info!(" - CloudWatch告警: {} 个 {}", if status.cloudwatch_alarms >= 5 { "✅" } else { "⚠️" }, status.cloudwatch_alarms);
2036 info!(" - 环境变量: {}", if status.lambda_environment { "✅" } else { "❌" });
2037 info!(" - IAM权限: {}", if status.iam_permissions { "✅" } else { "❌" });
2038
2039 if status.is_healthy {
2040 info!(" 🚨 429 throttling限制: ✅ 已激活");
2041 } else {
2042 info!(" 🚨 429 throttling限制: ❌ 未激活");
2043 }
2044 }
2045
2046 async fn print_status_summary(&self, region_statuses: &[RegionStatus]) {
2048 info!("\n{}", "=".repeat(80));
2049 info!("🎉 多区域部署状态检查完成");
2050 info!("{}", "=".repeat(80));
2051
2052 let healthy_count = region_statuses.iter().filter(|r| r.is_healthy).count();
2053 let total_count = region_statuses.len();
2054
2055 info!("📊 部署状态统计:");
2056 info!(" ✅ 完全健康的区域: {}/{}", healthy_count, total_count);
2057 info!(" ❌ 有问题的区域: {}/{}", total_count - healthy_count, total_count);
2058
2059 let lambda_count = region_statuses.iter().filter(|r| r.lambda_function).count();
2061 let sns_count = region_statuses.iter().filter(|r| r.sns_topic).count();
2062 let eventbridge_count = region_statuses.iter().filter(|r| r.eventbridge_rule).count();
2063 let env_count = region_statuses.iter().filter(|r| r.lambda_environment).count();
2064 let iam_count = region_statuses.iter().filter(|r| r.iam_permissions).count();
2065
2066 info!("\n📋 组件部署统计:");
2067 info!(" ✅ Lambda函数: {}/{} 区域", lambda_count, total_count);
2068 info!(" ✅ SNS主题: {}/{} 区域", sns_count, total_count);
2069 info!(" ✅ EventBridge规则: {}/{} 区域", eventbridge_count, total_count);
2070 info!(" ✅ Lambda环境变量: {}/{} 区域", env_count, total_count);
2071 info!(" ✅ IAM权限: {}/{} 区域", iam_count, total_count);
2072
2073 if healthy_count > 0 {
2075 info!("\n✅ 完全部署成功的区域:");
2076 for status in region_statuses.iter().filter(|r| r.is_healthy) {
2077 info!(" 🌍 {} (耗时: {:?})", status.region, status.total_duration);
2078 }
2079 }
2080
2081 if healthy_count < total_count {
2083 info!("\n❌ 需要修复的区域:");
2084 for status in region_statuses.iter().filter(|r| !r.is_healthy) {
2085 let issues = vec![
2086 (!status.lambda_function, "Lambda函数"),
2087 (!status.sns_topic, "SNS主题"),
2088 (!status.eventbridge_rule, "EventBridge规则"),
2089 (!status.lambda_environment, "环境变量"),
2090 (!status.iam_permissions, "IAM权限"),
2091 ].into_iter()
2092 .filter_map(|(missing, component)| if missing { Some(component) } else { None })
2093 .collect::<Vec<_>>();
2094
2095 info!(" 🌍 {}: 缺少 {}", status.region, issues.join(", "));
2096 }
2097 }
2098
2099 if healthy_count > 0 {
2100 info!("\n🚨 429 throttling限制状态:");
2101 info!(" ✅ 已激活区域: {}/{}", healthy_count, total_count);
2102 info!(" ⚡ 检测机制: 实时检测429错误");
2103 info!(" 🔒 自动响应: 立即禁用相关AK/SK");
2104
2105 info!("\n💡 验证建议:");
2106 info!(" 1. 在成功部署的区域测试Bedrock API调用");
2107 info!(" 2. 检查CloudWatch告警是否正常触发");
2108 info!(" 3. 验证SNS告警通知系统");
2109 info!(" 4. 测试429错误AK/SK自动禁用功能");
2110 }
2111 }
2112}
2113
2114#[derive(Debug)]
2116pub struct RegionStatus {
2117 pub region: String,
2118 pub lambda_function: bool,
2119 pub sns_topic: bool,
2120 pub eventbridge_rule: bool,
2121 pub cloudwatch_alarms: i32,
2122 pub lambda_environment: bool,
2123 pub iam_permissions: bool,
2124 pub total_duration: std::time::Duration,
2125 pub is_healthy: bool,
2126}
2127
2128pub fn create_lambda_zip() -> Result<Vec<u8>> {
2129 use std::io::{Cursor, Write};
2130 use zip::write::FileOptions;
2131
2132 let mut buffer = Vec::new();
2133 {
2134 let mut zip = zip::ZipWriter::new(Cursor::new(&mut buffer));
2135 let options = FileOptions::default()
2136 .compression_method(zip::CompressionMethod::Deflated);
2137
2138 let python_content = r#"import json
2140import os
2141import time
2142import re
2143import hashlib
2144from datetime import datetime
2145
2146# Redis connection setup (已移除,现在使用CloudWatch日志)
2147# REDIS_HOST = os.environ.get('REDIS_HOST', 'localhost')
2148# REDIS_PORT = int(os.environ.get('REDIS_PORT', 6379))
2149# REDIS_PASSWORD = os.environ.get('REDIS_PASSWORD', None)
2150
2151# CloudWatch Logs setup
2152CLOUDWATCH_LOG_GROUP = '/aws/lambda/bedrock-monitor-function/detailed'
2153CLOUDWATCH_LOG_STREAM = f"detailed-logs-{int(time.time())}"
2154
2155# CloudWatch Logs client
2156try:
2157 import boto3
2158 cloudwatch_logs = boto3.client('logs')
2159 CLOUDWATCH_AVAILABLE = True
2160except ImportError:
2161 CLOUDWATCH_AVAILABLE = False
2162 print("Warning: boto3 not available, CloudWatch logging disabled")
2163
2164def get_log_stream_name():
2165 """Generate unique log stream name"""
2166 return CLOUDWATCH_LOG_STREAM
2167
2168def ensure_log_group_and_stream():
2169 """Ensure log group and stream exist"""
2170 if not CLOUDWATCH_AVAILABLE:
2171 return
2172
2173 try:
2174 # Create log group if it doesn't exist
2175 try:
2176 cloudwatch_logs.create_log_group(logGroupName=CLOUDWATCH_LOG_GROUP)
2177 print(f"Created CloudWatch log group: {CLOUDWATCH_LOG_GROUP}")
2178 except cloudwatch_logs.exceptions.ResourceAlreadyExistsException:
2179 pass # Log group already exists
2180
2181 # Create log stream
2182 log_stream_name = get_log_stream_name()
2183 try:
2184 cloudwatch_logs.create_log_stream(
2185 logGroupName=CLOUDWATCH_LOG_GROUP,
2186 logStreamName=log_stream_name
2187 )
2188 print(f"Created CloudWatch log stream: {log_stream_name}")
2189 except cloudwatch_logs.exceptions.ResourceAlreadyExistsException:
2190 pass # Log stream already exists
2191
2192 except Exception as e:
2193 print(f"Error setting up CloudWatch Logs: {e}")
2194
2195
2196def sanitize_data(data, level='partial'):
2197 """Sanitize sensitive data from request/response content"""
2198 if not isinstance(data, (dict, str)):
2199 return data
2200
2201 if isinstance(data, str):
2202 # Remove potential API keys, tokens, and sensitive patterns
2203 sanitized = data
2204
2205 # Remove AWS access keys
2206 sanitized = re.sub(r'AKIA[0-9A-Z]{16}', 'AKIAXXXXXXXXXXXXXXXX', sanitized)
2207
2208 # Remove potential tokens and secrets
2209 sanitized = re.sub(r'[Bb]earer\s+[A-Za-z0-9\-._~+\/]+=*', 'bearer XXXXXXXX', sanitized)
2210 sanitized = re.sub(r'[Tt]oken\s*[:=]\s*[A-Za-z0-9\-._~+\/]+=*', 'token: XXXXXXXX', sanitized)
2211
2212 # Remove email addresses
2213 sanitized = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 'email@domain.com', sanitized)
2214
2215 # Remove phone numbers (basic pattern)
2216 sanitized = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', 'XXX-XXX-XXXX', sanitized)
2217
2218 # Remove credit card numbers (basic pattern)
2219 sanitized = re.sub(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', 'XXXX-XXXX-XXXX-XXXX', sanitized)
2220
2221 # Remove IP addresses for privacy (optional, based on level)
2222 if level == 'full':
2223 sanitized = re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', 'X.X.X.X', sanitized)
2224
2225 return sanitized
2226
2227 if isinstance(data, dict):
2228 sanitized_dict = {}
2229 for key, value in data.items():
2230 # Check for sensitive key patterns
2231 if any(pattern in key.lower() for pattern in
2232 ['password', 'token', 'key', 'secret', 'auth', 'credential', 'session']):
2233 sanitized_dict[key] = 'XXXXXXXX'
2234 elif any(pattern in key.lower() for pattern in ['email', 'mail']):
2235 sanitized_dict[key] = 'email@domain.com'
2236 elif key.lower() in ['client_ip', 'source_ip'] and level == 'full':
2237 sanitized_dict[key] = 'X.X.X.X'
2238 else:
2239 sanitized_dict[key] = sanitize_data(value, level)
2240 return sanitized_dict
2241
2242 return data
2243
2244def should_log_detailed_content(status_code, event_type='unknown'):
2245 """Determine if we should log detailed content based on status and event type"""
2246 # Always log errors and throttling in detail
2247 if status_code in [429, 500, 502, 503, 504]:
2248 return 'full'
2249
2250 # Log SNS alarms in detail
2251 if event_type == 'CloudWatchAlarm':
2252 return 'full'
2253
2254 # For successful requests, log summary only
2255 if status_code == 200:
2256 return 'summary'
2257
2258 # Default to partial for other cases
2259 return 'partial'
2260
2261def log_to_cloudwatch_detailed(log_entry, log_level='INFO'):
2262 """Send detailed log entry to CloudWatch Logs"""
2263 if not CLOUDWATCH_AVAILABLE:
2264 print(f"CloudWatch unavailable, logging to console: {log_level} - {json.dumps(log_entry)}")
2265 return
2266
2267 try:
2268 timestamp = int(time.time() * 1000)
2269
2270 log_event = {
2271 'timestamp': timestamp,
2272 'message': json.dumps(log_entry, default=str)
2273 }
2274
2275 cloudwatch_logs.put_log_events(
2276 logGroupName=CLOUDWATCH_LOG_GROUP,
2277 logStreamName=get_log_stream_name(),
2278 logEvents=[log_event]
2279 )
2280
2281 except Exception as e:
2282 print(f"Failed to send to CloudWatch Logs: {e}")
2283 # Fallback to console logging
2284 print(f"{log_level} - {json.dumps(log_entry, default=str)}")
2285
2286def create_detailed_log_entry(event_data, request_data, response_data, status_code, log_level='summary'):
2287 """Create a detailed log entry with appropriate level of detail"""
2288 timestamp = datetime.now().isoformat()
2289
2290 # Base log entry with always-included fields
2291 log_entry = {
2292 'timestamp': timestamp,
2293 'status_code': status_code,
2294 'log_level': log_level,
2295 'request_id': request_data.get('request_id', 'unknown'),
2296 'model': request_data.get('model', 'unknown'),
2297 'client_ip': request_data.get('client_ip', 'unknown'),
2298 'aws_region': request_data.get('aws_region', 'unknown'),
2299 'event_type': event_data.get('event_type', 'unknown')
2300 }
2301
2302 # Add detailed information based on log level
2303 if log_level == 'full':
2304 # Include complete sanitized data
2305 log_entry.update({
2306 'full_request': sanitize_data(request_data, 'partial'),
2307 'full_response': sanitize_data(response_data, 'partial'),
2308 'error_details': request_data.get('error_details', {}),
2309 'user_agent': request_data.get('user_agent', 'unknown'),
2310 'caller_identity': request_data.get('caller_identity', {}),
2311 'request_parameters': sanitize_data(request_data.get('request_parameters', {}), 'partial'),
2312 'response_elements': sanitize_data(response_data.get('response_elements', {}), 'partial')
2313 })
2314 elif log_level == 'summary':
2315 # Include summary information only
2316 log_entry.update({
2317 'api_action': request_data.get('api_action', 'unknown'),
2318 'processing_time_ms': request_data.get('processing_time_ms', 0),
2319 'content_type': response_data.get('content_type', 'unknown'),
2320 'error_code': request_data.get('error_code', None)
2321 })
2322 else: # partial
2323 # Include some detail but sanitize heavily
2324 log_entry.update({
2325 'api_action': request_data.get('api_action', 'unknown'),
2326 'partial_request': sanitize_data(request_data, 'full'),
2327 'error_code': request_data.get('error_code', None)
2328 })
2329
2330 return log_entry
2331
2332def handle_throttle_event(request_data):
2333 """处理429错误事件,立即禁用AK/SK(简化版,无Redis依赖)"""
2334 try:
2335 import boto3
2336 import os
2337
2338 # 检查是否为试运行模式
2339 dry_run_mode = os.environ.get('BEDROCK_DRY_RUN_MODE', 'false').lower() == 'true'
2340
2341 # 提取关键信息
2342 timestamp = request_data.get('timestamp', int(datetime.now().timestamp()))
2343 request_id = request_data.get('request_id', 'unknown')
2344 model_id = request_data.get('model', 'unknown')
2345 client_ip = request_data.get('client_ip', 'unknown')
2346 aws_region = request_data.get('aws_region', 'unknown')
2347
2348 # 从调用者身份中提取AK/SK信息
2349 caller_identity = request_data.get('caller_identity', {})
2350 username = caller_identity.get('userName', 'unknown')
2351 access_key_id = caller_identity.get('accessKeyId', 'unknown')
2352
2353 # 如果没有找到AK/SK,尝试从事件详情中获取
2354 if not access_key_id or access_key_id == 'unknown':
2355 event_details = request_data.get('event_details', {})
2356 access_key_id = event_details.get('accessKeyId', 'unknown')
2357 if not access_key_id or access_key_id == 'unknown':
2358 username = event_details.get('userName', 'unknown')
2359 access_key_id = event_details.get('accessKeyId', 'unknown')
2360
2361 print(f"🚨 429错误检测:")
2362 print(f" - 时间: {datetime.fromtimestamp(timestamp).isoformat()}")
2363 print(f" - 用户: {username}")
2364 print(f" - AK/SK: {access_key_id}")
2365 print(f" - 模型: {model_id}")
2366 print(f" - 源IP: {client_ip}")
2367 print(f" - 区域: {aws_region}")
2368 print(f" - 模式: {'试运行' if dry_run_mode else '正式运行'}")
2369
2370 # 创建详细的事件记录
2371 disable_record = {
2372 "timestamp": datetime.fromtimestamp(timestamp).isoformat(),
2373 "username": username,
2374 "access_key_id": access_key_id,
2375 "model_id": model_id,
2376 "source_ip": client_ip,
2377 "request_id": request_id,
2378 "aws_region": aws_region,
2379 "event_type": "throttle_immediate_disable",
2380 "dry_run": dry_run_mode,
2381 "action_reason": "429_rate_limit_error"
2382 }
2383
2384 # 记录到CloudWatch日志
2385 print(f"📝 429错误记录: {json.dumps(disable_record)}")
2386
2387 # 如果无法获取必要信息,跳过禁用操作
2388 if access_key_id == 'unknown' or username == 'unknown':
2389 print(f"⚠️ 无法获取有效的用户或AK/SK信息,跳过禁用操作")
2390 return False
2391
2392 # 立即禁用AK/SK(无需Redis检查)
2393 if not dry_run_mode:
2394 try:
2395 # 初始化IAM客户端
2396 iam_client = boto3.client('iam')
2397
2398 # 调用IAM禁用访问密钥
2399 response = iam_client.update_access_key(
2400 UserName=username,
2401 AccessKeyId=access_key_id,
2402 Status='Inactive'
2403 )
2404
2405 print(f"✅ 成功禁用AK/SK: {access_key_id}")
2406 print(f"🔒 用户 {username} 的访问密钥已被禁用")
2407
2408 # 更新记录
2409 disable_record.update({
2410 "action": "access_key_disabled",
2411 "status": "success",
2412 "iam_response": str(response)
2413 })
2414
2415 except Exception as iam_error:
2416 print(f"❌ IAM禁用AK/SK失败: {iam_error}")
2417
2418 # 记录失败信息
2419 disable_record.update({
2420 "action": "access_key_disable_failed",
2421 "status": "error",
2422 "error": str(iam_error)
2423 })
2424
2425 return False
2426 else:
2427 print(f"🧪 试运行模式:AK/SK {access_key_id} 将被禁用(未实际执行)")
2428 disable_record["action"] = "dry_run_would_disable"
2429
2430 # 最终记录到CloudWatch
2431 print(f"📋 最终操作记录: {json.dumps(disable_record)}")
2432
2433 return True
2434
2435 except Exception as e:
2436 print(f"❌ 处理429错误时发生异常: {e}")
2437 return False
2438
2439
2440def record_detailed_request(request_data):
2441 """记录详细的请求信息到CloudWatch日志(简化版,无Redis依赖)"""
2442 try:
2443 timestamp = request_data.get('timestamp', int(datetime.now().timestamp()))
2444 status = request_data.get('status_code', 0)
2445 model = request_data.get('model', 'unknown')
2446 client_ip = request_data.get('client_ip', 'unknown')
2447 request_id = request_data.get('request_id', 'unknown')
2448
2449 # 创建日志记录
2450 log_entry = {
2451 "timestamp": datetime.fromtimestamp(timestamp).isoformat(),
2452 "status_code": status,
2453 "model": model,
2454 "client_ip": client_ip,
2455 "request_id": request_id,
2456 "api_action": request_data.get('api_action', 'unknown'),
2457 "aws_region": request_data.get('aws_region', 'unknown'),
2458 "user_agent": request_data.get('user_agent', 'unknown'),
2459 "log_type": "bedrock_api_call"
2460 }
2461
2462 # 记录到CloudWatch日志
2463 print(f"📊 Bedrock API调用记录: {json.dumps(log_entry)}")
2464
2465 # 如果是429错误,立即处理AK/SK禁用
2466 if status == 429:
2467 handle_throttle_event(request_data)
2468
2469 return True
2470
2471 except Exception as e:
2472 print(f"❌ 记录请求信息失败: {e}")
2473 return False
2474
2475def record_basic_status(status_code, timestamp):
2476 """记录基本状态到CloudWatch日志(替代Redis记录)"""
2477 try:
2478 # 记录基本计数到CloudWatch日志
2479 log_entry = {
2480 "timestamp": datetime.fromtimestamp(timestamp).isoformat(),
2481 "status_code": status_code,
2482 "log_type": "bedrock_status_count"
2483 }
2484
2485 print(f"📈 Bedrock状态计数: {json.dumps(log_entry)}")
2486 return True
2487
2488 except Exception as e:
2489 print(f"❌ 记录状态失败: {e}")
2490 return False
2491
2492def extract_bedrock_details_from_event(event):
2493 """从EventBridge CloudTrail事件中提取详细的请求和响应信息"""
2494 try:
2495 if 'detail' not in event:
2496 return None
2497
2498 detail = event['detail']
2499
2500 # 提取请求信息
2501 request_params = detail.get('requestParameters', {})
2502 response_elements = detail.get('responseElements', {})
2503 user_identity = detail.get('userIdentity', {})
2504
2505 # 增强的429错误检测逻辑 - 处理responseElements可能为None的情况
2506 status_code = response_elements.get('httpStatusCode', 0) if response_elements else 0
2507 error_code = detail.get('errorCode', '')
2508 response_error = response_elements.get('error', '') if response_elements else ''
2509
2510 # 检查多个字段判断是否为429错误
2511 is_throttled = (
2512 status_code == 429 or
2513 error_code in ['ThrottlingException', 'ServiceQuotaExceededException'] or
2514 (isinstance(response_error, dict) and response_error.get('code') in ['ThrottlingException', 'ServiceQuotaExceededException']) or
2515 (isinstance(response_error, str) and ('ThrottlingException' in response_error or 'ServiceQuotaExceededException' in response_error))
2516 )
2517
2518 # 如果检测到429相关错误,强制设置status_code为429
2519 if is_throttled:
2520 status_code = 429
2521 print(f"🚨 检测到429/限流错误: status={status_code}, errorCode={error_code}, responseError={response_error}")
2522
2523 request_data = {
2524 'timestamp': int(datetime.now().timestamp()),
2525 'event_time': detail.get('eventTime', ''),
2526 'request_id': response_elements.get('requestId', detail.get('requestID', 'unknown')) if response_elements else detail.get('requestID', 'unknown'),
2527 'model': request_params.get('modelId', 'unknown'),
2528 'api_action': detail.get('eventName', 'unknown'),
2529 'aws_region': detail.get('awsRegion', ''),
2530 'status_code': status_code, # 使用增强检测后的状态码
2531 'client_ip': detail.get('sourceIPAddress', 'unknown'),
2532 'user_agent': detail.get('userAgent', 'unknown'),
2533 'error_code': error_code,
2534 'content_type': response_elements.get('contentType', None) if response_elements else None,
2535 'caller_identity': {
2536 'account_id': user_identity.get('accountId', ''),
2537 'principal_id': user_identity.get('principalId', ''),
2538 'type': user_identity.get('type', ''),
2539 'arn': user_identity.get('arn', ''),
2540 'userName': user_identity.get('userName', ''),
2541 'accessKeyId': user_identity.get('accessKeyId', '')
2542 },
2543 'request_parameters': {
2544 'model_id': request_params.get('modelId', ''),
2545 # 注意:实际请求内容不会记录在CloudTrail中,只记录参数
2546 },
2547 'response_elements': {
2548 'http_status': response_elements.get('httpStatusCode', 0) if response_elements else 0,
2549 'content_type': response_elements.get('contentType', '') if response_elements else '',
2550 'request_id': response_elements.get('requestId', '') if response_elements else '',
2551 'error': response_elements.get('error', None) if response_elements else None
2552 },
2553 'raw_error_code': error_code,
2554 'raw_response_error': response_error,
2555 'is_throttled': is_throttled
2556 }
2557
2558 print(f"📋 提取的请求数据: status_code={status_code}, is_throttled={is_throttled}")
2559 print(f"📊 调试信息: errorCode={error_code}, responseError={response_error}")
2560
2561 return request_data
2562
2563 except Exception as e:
2564 print(f"❌ 从EventBridge事件提取详情时出错: {e}")
2565 return None
2566
2567def extract_bedrock_status_from_event(event):
2568 """Extract status code from EventBridge CloudTrail event (backwards compatibility)"""
2569 try:
2570 request_data = extract_bedrock_details_from_event(event)
2571 if request_data:
2572 return request_data['status_code']
2573 return None
2574 except Exception as e:
2575 print(f"Error extracting status from EventBridge event: {e}")
2576 return 200
2577
2578def handle_sns_alarm(event):
2579 """Handle CloudWatch alarm notifications from SNS"""
2580 try:
2581 print(f"🚨 处理CloudWatch SNS告警事件: {json.dumps(event)}")
2582
2583 # Extract SNS message
2584 if 'Records' not in event:
2585 print("❌ SNS事件中没有找到记录")
2586 return None
2587
2588 alarm_count = 0
2589 for record in event['Records']:
2590 if 'Sns' not in record:
2591 continue
2592
2593 sns_message = json.loads(record['Sns']['Message'])
2594 print(f"📋 SNS消息: {json.dumps(sns_message)}")
2595
2596 # Extract CloudWatch alarm information
2597 alarm_name = sns_message.get('AlarmName', 'unknown')
2598 alarm_arn = sns_message.get('AlarmArn', 'unknown')
2599 state_value = sns_message.get('NewStateValue', 'unknown')
2600 state_reason = sns_message.get('NewStateReason', 'unknown')
2601
2602 # Check if this is a 429-related alarm
2603 is_429_alarm = (
2604 '429' in alarm_name or
2605 'throttle' in alarm_name.lower() or
2606 'InvocationThrottles' in alarm_name or
2607 'Bedrock' in alarm_name
2608 )
2609
2610 # Extract alarm configuration details
2611 trigger_info = sns_message.get('Trigger', {})
2612 metric_name = trigger_info.get('MetricName', 'unknown')
2613 namespace = trigger_info.get('Namespace', 'unknown')
2614 threshold = trigger_info.get('Threshold', 0)
2615
2616 print(f"🔔 CloudWatch告警详情:")
2617 print(f" - 名称: {alarm_name}")
2618 print(f" - 状态: {state_value}")
2619 print(f" - 原因: {state_reason}")
2620 print(f" - 指标: {metric_name}")
2621 print(f" - 命名空间: {namespace}")
2622 print(f" - 阈值: {threshold}")
2623 print(f" - 是否429相关: {is_429_alarm}")
2624
2625 # Create alarm data for logging
2626 alarm_data = {
2627 'timestamp': datetime.now().isoformat(),
2628 'alarm_name': alarm_name,
2629 'alarm_arn': alarm_arn,
2630 'state_value': state_value,
2631 'state_reason': state_reason,
2632 'region': sns_message.get('Region', os.environ.get('AWS_REGION', '')),
2633 'event_type': 'CloudWatchAlarm',
2634 'is_429_related': is_429_alarm,
2635 'alarm_configuration': {
2636 'metric': metric_name,
2637 'namespace': namespace,
2638 'threshold': threshold,
2639 'comparison_operator': trigger_info.get('ComparisonOperator', 'unknown'),
2640 'evaluation_periods': trigger_info.get('EvaluationPeriods', 0)
2641 },
2642 'dimensions': trigger_info.get('Dimensions', []),
2643 'raw_sns_message': sns_message
2644 }
2645
2646 # Log to CloudWatch
2647 print(f"📝 记录CloudWatch告警: {json.dumps(alarm_data)}")
2648
2649 # If this is a 429 alarm in ALARM state, trigger immediate action
2650 if is_429_alarm and state_value == 'ALARM':
2651 print(f"🚨 检测到429相关告警触发!")
2652 print(f" - 告警名称: {alarm_name}")
2653 print(f" - 阈值: {threshold} 次")
2654 print(f" - 状态: {state_value}")
2655
2656 # Create a mock 429 event to trigger the same logic as direct 429 detection
2657 mock_429_event = {
2658 "timestamp": datetime.now().isoformat(),
2659 "event_type": "SNS_Alarm_429_Detection",
2660 "alarm_name": alarm_name,
2661 "metric_value": threshold,
2662 "threshold": threshold,
2663 "state_reason": state_reason,
2664 "trigger_source": "CloudWatch_Metric_Alarm"
2665 }
2666
2667 # Handle this as a 429 event (will trigger AK/SK disabling)
2668 handle_sns_triggered_429(mock_429_event)
2669
2670 alarm_count += 1
2671
2672 print(f"✅ 成功处理了 {alarm_count} 个SNS告警事件")
2673
2674 return {
2675 'status': 'success',
2676 'message': f'SNS告警处理成功,共处理 {alarm_count} 个告警',
2677 'alarms_processed': alarm_count,
2678 'recorded': True
2679 }
2680
2681 except Exception as e:
2682 print(f"❌ 处理SNS告警时出错: {e}")
2683 return {
2684 'status': 'error',
2685 'message': f'处理SNS告警失败: {str(e)}',
2686 'recorded': False
2687 }
2688
2689def handle_sns_triggered_429(alarm_event):
2690 """处理由SNS告警触发的429检测"""
2691 try:
2692 print(f"🔥 处理SNS告警触发的429事件: {json.dumps(alarm_event)}")
2693
2694 # Since this is a CloudWatch alarm, we don't have specific user/AK/SK info
2695 # But we can still log and potentially implement account-level policies
2696
2697 alarm_name = alarm_event.get('alarm_name', 'unknown')
2698 metric_value = alarm_event.get('metric_value', 0)
2699 threshold = alarm_event.get('threshold', 0)
2700
2701 # Create a log entry
2702 log_entry = {
2703 'timestamp': datetime.now().isoformat(),
2704 'event_type': 'SNS_Alarm_429_Response',
2705 'alarm_name': alarm_name,
2706 'metric_value': metric_value,
2707 'threshold': threshold,
2708 'state_reason': alarm_event.get('state_reason', 'CloudWatch alarm triggered'),
2709 'trigger_source': alarm_event.get('trigger_source', 'CloudWatch'),
2710 'action_taken': 'Alarm logged and monitored',
2711 'recommendation': 'Check recent Bedrock API usage and consider rate limiting'
2712 }
2713
2714 print(f"📝 SNS触发的429日志: {json.dumps(log_entry)}")
2715
2716 # Note: Since SNS alarms don't contain specific AK/SK information,
2717 # we can't disable specific credentials here.
2718 # However, we can implement account-level monitoring and policies
2719
2720 print(f"💡 注意: SNS告警不包含具体的AK/SK信息")
2721 print(f" - 建议检查CloudTrail日志获取具体的用户信息")
2722 print(f" - 考虑实施账户级别的限流策略")
2723
2724 return True
2725
2726 except Exception as e:
2727 print(f"❌ 处理SNS触发的429事件时出错: {e}")
2728 return False
2729
2730
2731def lambda_handler(event, context):
2732 try:
2733 print(f"Received event: {json.dumps(event)}")
2734
2735 # Initialize CloudWatch Logs if available
2736 if CLOUDWATCH_AVAILABLE:
2737 ensure_log_group_and_stream()
2738
2739 # Check if this is an SNS event (CloudWatch alarm)
2740 if 'Records' in event and len(event['Records']) > 0 and 'Sns' in event['Records'][0]:
2741 print("Detected SNS event, processing as CloudWatch alarm")
2742 sns_result = handle_sns_alarm(event)
2743 return {
2744 'statusCode': 200,
2745 'body': json.dumps(sns_result),
2746 'headers': {'Content-Type': 'application/json'}
2747 }
2748
2749 # 尝试从EventBridge事件中提取详细信息
2750 request_data = extract_bedrock_details_from_event(event)
2751 status_code = None
2752
2753 if request_data:
2754 status_code = request_data['status_code']
2755 print(f"Extracted from EventBridge - Status: {status_code}, Model: {request_data['model']}, Client IP: {request_data['client_ip']}")
2756 else:
2757 # 如果不是EventBridge事件,使用直接调用数据
2758 status_code = event.get('status_code', 200)
2759 request_data = {
2760 'timestamp': int(datetime.now().timestamp()),
2761 'request_id': event.get('request_id', context.aws_request_id if context else 'unknown'),
2762 'model': event.get('model_id', 'unknown'),
2763 'api_action': 'DirectInvoke',
2764 'aws_region': os.environ.get('AWS_REGION', ''),
2765 'status_code': status_code,
2766 'client_ip': event.get('client_ip', 'lambda-direct'),
2767 'user_agent': event.get('user_agent', 'lambda-function'),
2768 'error_code': event.get('error', None),
2769 'content_type': None,
2770 'caller_identity': {
2771 'account_id': '',
2772 'principal_id': 'lambda',
2773 'type': 'Lambda',
2774 'arn': context.invoked_function_arn if context else ''
2775 },
2776 'request_parameters': event.get('request_parameters', {}),
2777 'response_elements': event.get('response_elements', {})
2778 }
2779 print(f"Using direct call data - Status: {status_code}, Model: {request_data['model']}")
2780
2781 # Log the status (this goes to CloudWatch)
2782 print(f"Bedrock API Status: {status_code}")
2783
2784 # Determine logging level based on status and event type
2785 event_type = "CloudWatchAlarm" if 'Records' in event and 'Sns' in event.get('Records', [{}])[0] else ("EventBridge" if 'detail' in event else "Direct")
2786 log_level = should_log_detailed_content(status_code, event_type)
2787
2788 # Create response data for logging
2789 response_data = {
2790 'content_type': request_data.get('content_type'),
2791 'response_elements': request_data.get('response_elements', {}),
2792 'processing_time_ms': request_data.get('processing_time_ms', 0)
2793 }
2794
2795 # 记录详细信息(扩展到所有状态码,但使用不同的日志级别)
2796 print(f"Recording information for status {status_code} with log level: {log_level}")
2797
2798 # 记录到CloudWatch日志(简化版,无Redis依赖)
2799 if status_code in [200, 429]:
2800 record_detailed_request(request_data)
2801 # 记录基本计数(替代Redis)
2802 timestamp = int(datetime.now().timestamp())
2803 record_basic_status(status_code, timestamp)
2804 else:
2805 # 记录基本计数
2806 timestamp = int(datetime.now().timestamp())
2807 record_basic_status(status_code, timestamp)
2808
2809 # 🔥 关键修复:检测429错误并禁用AK/SK
2810 if status_code in [429] or request_data.get('error_code') in ['ThrottlingException', 'ServiceQuotaExceededException']:
2811 print(f"🚨 检测到429/Throttling错误!状态码: {status_code}, 错误码: {request_data.get('error_code')}")
2812 try:
2813 handle_throttle_event(request_data)
2814 except Exception as e:
2815 print(f"❌ 处理429事件失败: {e}")
2816 elif status_code == 200 and request_data.get('error_code') in ['ThrottlingException', 'ServiceQuotaExceededException']:
2817 print(f"🚨 检测到状态码200但包含Throttling错误!错误码: {request_data.get('error_code')}")
2818 try:
2819 handle_throttle_event(request_data)
2820 except Exception as e:
2821 print(f"❌ 处理Throttling事件失败: {e}")
2822
2823 # Log to CloudWatch Logs with appropriate detail level
2824 try:
2825 detailed_log_entry = create_detailed_log_entry(
2826 event_data={'event_type': event_type, 'raw_event': sanitize_data(event, 'full')},
2827 request_data=request_data,
2828 response_data=response_data,
2829 status_code=status_code,
2830 log_level=log_level
2831 )
2832
2833 log_to_cloudwatch_detailed(detailed_log_entry, 'INFO' if status_code == 200 else 'ERROR')
2834 print(f"Logged to CloudWatch with level: {log_level}")
2835
2836 except Exception as e:
2837 print(f"Failed to log to CloudWatch: {e}")
2838
2839 # Create appropriate response based on status code
2840 if status_code == 200:
2841 response_body = {
2842 "status": "success",
2843 "message": "Bedrock request completed successfully",
2844 "recorded": True,
2845 "event_type": "EventBridge" if 'detail' in event else "Direct",
2846 "request_id": request_data['request_id'],
2847 "model": request_data['model'],
2848 "client_ip": request_data['client_ip']
2849 }
2850 elif status_code == 429:
2851 response_body = {
2852 "status": "rate_limit",
2853 "message": "Bedrock rate limit exceeded",
2854 "recorded": True,
2855 "event_type": "EventBridge" if 'detail' in event else "Direct",
2856 "request_id": request_data['request_id'],
2857 "model": request_data['model'],
2858 "client_ip": request_data['client_ip']
2859 }
2860 elif status_code == 400:
2861 response_body = {
2862 "status": "bad_request",
2863 "message": "Bedrock bad request",
2864 "recorded": False, # Not recorded per requirement
2865 "event_type": "EventBridge" if 'detail' in event else "Direct"
2866 }
2867 elif status_code == 500:
2868 response_body = {
2869 "status": "server_error",
2870 "message": "Bedrock server error",
2871 "recorded": False, # Not recorded per requirement
2872 "event_type": "EventBridge" if 'detail' in event else "Direct"
2873 }
2874 else:
2875 response_body = {
2876 "status": "processed",
2877 "message": f"Bedrock status {status_code} received",
2878 "recorded": False,
2879 "event_type": "EventBridge" if 'detail' in event else "Direct"
2880 }
2881
2882 return {
2883 'statusCode': 200, # Always return 200 for EventBridge
2884 'body': json.dumps(response_body)
2885 }
2886
2887 except Exception as e:
2888 print(f"Error processing Lambda function: {e}")
2889 return {
2890 'statusCode': 500,
2891 'body': json.dumps({
2892 'status': 'error',
2893 'message': f'Internal server error: {str(e)}',
2894 'recorded': False
2895 })
2896 }
2897"#;
2898
2899 zip.start_file("lambda_function.py", options)?;
2900 zip.write_all(python_content.as_bytes())?;
2901
2902 zip.finish()?;
2903 }
2904
2905 Ok(buffer)
2906}