1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_core::validation::*;
12
13use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
14use fakecloud_logs::state::SharedLogsState;
15
16use crate::state::{
17 ApiDestination, Archive, Connection, Endpoint, EventBus, EventRule, EventTarget,
18 PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
19};
20
21pub struct EventBridgeService {
22 state: SharedEventBridgeState,
23 delivery: Arc<DeliveryBus>,
24 lambda_state: Option<SharedLambdaState>,
25 logs_state: Option<SharedLogsState>,
26}
27
28impl EventBridgeService {
29 pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
30 Self {
31 state,
32 delivery,
33 lambda_state: None,
34 logs_state: None,
35 }
36 }
37
38 pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
39 self.lambda_state = Some(lambda_state);
40 self
41 }
42
43 pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
44 self.logs_state = Some(logs_state);
45 self
46 }
47}
48
49#[async_trait]
50impl AwsService for EventBridgeService {
51 fn service_name(&self) -> &str {
52 "events"
53 }
54
55 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
56 match req.action.as_str() {
57 "CreateEventBus" => self.create_event_bus(&req),
58 "DeleteEventBus" => self.delete_event_bus(&req),
59 "ListEventBuses" => self.list_event_buses(&req),
60 "DescribeEventBus" => self.describe_event_bus(&req),
61 "PutRule" => self.put_rule(&req),
62 "DeleteRule" => self.delete_rule(&req),
63 "ListRules" => self.list_rules(&req),
64 "DescribeRule" => self.describe_rule(&req),
65 "EnableRule" => self.enable_rule(&req),
66 "DisableRule" => self.disable_rule(&req),
67 "PutTargets" => self.put_targets(&req),
68 "RemoveTargets" => self.remove_targets(&req),
69 "ListTargetsByRule" => self.list_targets_by_rule(&req),
70 "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
71 "PutEvents" => self.put_events(&req),
72 "PutPermission" => self.put_permission(&req),
73 "RemovePermission" => self.remove_permission(&req),
74 "TagResource" => self.tag_resource(&req),
75 "UntagResource" => self.untag_resource(&req),
76 "ListTagsForResource" => self.list_tags_for_resource(&req),
77 "CreateArchive" => self.create_archive(&req),
78 "DescribeArchive" => self.describe_archive(&req),
79 "ListArchives" => self.list_archives(&req),
80 "UpdateArchive" => self.update_archive(&req),
81 "DeleteArchive" => self.delete_archive(&req),
82 "CreateConnection" => self.create_connection(&req),
83 "DescribeConnection" => self.describe_connection(&req),
84 "ListConnections" => self.list_connections(&req),
85 "UpdateConnection" => self.update_connection(&req),
86 "DeleteConnection" => self.delete_connection(&req),
87 "CreateApiDestination" => self.create_api_destination(&req),
88 "DescribeApiDestination" => self.describe_api_destination(&req),
89 "ListApiDestinations" => self.list_api_destinations(&req),
90 "UpdateApiDestination" => self.update_api_destination(&req),
91 "DeleteApiDestination" => self.delete_api_destination(&req),
92 "StartReplay" => self.start_replay(&req),
93 "DescribeReplay" => self.describe_replay(&req),
94 "ListReplays" => self.list_replays(&req),
95 "CancelReplay" => self.cancel_replay(&req),
96 "CreatePartnerEventSource" => self.create_partner_event_source(&req),
97 "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
98 "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
99 "ListPartnerEventSources" => self.list_partner_event_sources(&req),
100 "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
101 "ActivateEventSource" => self.activate_event_source(&req),
102 "DeactivateEventSource" => self.deactivate_event_source(&req),
103 "DescribeEventSource" => self.describe_event_source(&req),
104 "ListEventSources" => self.list_event_sources(&req),
105 "PutPartnerEvents" => self.put_partner_events(&req),
106 "TestEventPattern" => self.test_event_pattern(&req),
107 "UpdateEventBus" => self.update_event_bus(&req),
108 "CreateEndpoint" => self.create_endpoint(&req),
109 "DeleteEndpoint" => self.delete_endpoint(&req),
110 "DescribeEndpoint" => self.describe_endpoint(&req),
111 "ListEndpoints" => self.list_endpoints(&req),
112 "UpdateEndpoint" => self.update_endpoint(&req),
113 "DeauthorizeConnection" => self.deauthorize_connection(&req),
114 _ => Err(AwsServiceError::action_not_implemented(
115 "events",
116 &req.action,
117 )),
118 }
119 }
120
121 fn supported_actions(&self) -> &[&str] {
122 &[
123 "CreateEventBus",
124 "DeleteEventBus",
125 "ListEventBuses",
126 "DescribeEventBus",
127 "PutRule",
128 "DeleteRule",
129 "ListRules",
130 "DescribeRule",
131 "EnableRule",
132 "DisableRule",
133 "PutTargets",
134 "RemoveTargets",
135 "ListTargetsByRule",
136 "ListRuleNamesByTarget",
137 "PutEvents",
138 "PutPermission",
139 "RemovePermission",
140 "TagResource",
141 "UntagResource",
142 "ListTagsForResource",
143 "CreateArchive",
144 "DescribeArchive",
145 "ListArchives",
146 "UpdateArchive",
147 "DeleteArchive",
148 "CreateConnection",
149 "DescribeConnection",
150 "ListConnections",
151 "UpdateConnection",
152 "DeleteConnection",
153 "CreateApiDestination",
154 "DescribeApiDestination",
155 "ListApiDestinations",
156 "UpdateApiDestination",
157 "DeleteApiDestination",
158 "StartReplay",
159 "DescribeReplay",
160 "ListReplays",
161 "CancelReplay",
162 "CreatePartnerEventSource",
163 "DeletePartnerEventSource",
164 "DescribePartnerEventSource",
165 "ListPartnerEventSources",
166 "ListPartnerEventSourceAccounts",
167 "ActivateEventSource",
168 "DeactivateEventSource",
169 "DescribeEventSource",
170 "ListEventSources",
171 "PutPartnerEvents",
172 "TestEventPattern",
173 "UpdateEventBus",
174 "CreateEndpoint",
175 "DeleteEndpoint",
176 "DescribeEndpoint",
177 "ListEndpoints",
178 "UpdateEndpoint",
179 "DeauthorizeConnection",
180 ]
181 }
182}
183
184fn parse_body(req: &AwsRequest) -> Value {
185 serde_json::from_slice(&req.body).unwrap_or(Value::Object(Default::default()))
186}
187
188fn json_resp(body: Value) -> AwsResponse {
189 AwsResponse::json(StatusCode::OK, serde_json::to_string(&body).unwrap())
190}
191
192fn parse_tags(body: &Value) -> HashMap<String, String> {
193 let mut tags = HashMap::new();
194 if let Some(arr) = body["Tags"].as_array() {
195 for tag in arr {
196 if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
197 tags.insert(key.to_string(), val.to_string());
198 }
199 }
200 }
201 tags
202}
203
204fn parse_target(target: &Value) -> EventTarget {
205 EventTarget {
206 id: target["Id"].as_str().unwrap_or("").to_string(),
207 arn: target["Arn"].as_str().unwrap_or("").to_string(),
208 input: target["Input"].as_str().map(|s| s.to_string()),
209 input_path: target["InputPath"].as_str().map(|s| s.to_string()),
210 input_transformer: target.get("InputTransformer").cloned(),
211 sqs_parameters: target.get("SqsParameters").cloned(),
212 }
213}
214
215fn target_to_json(t: &EventTarget) -> Value {
216 let mut obj = json!({ "Id": t.id, "Arn": t.arn });
217 if let Some(ref input) = t.input {
218 obj["Input"] = json!(input);
219 }
220 if let Some(ref input_path) = t.input_path {
221 obj["InputPath"] = json!(input_path);
222 }
223 if let Some(ref it) = t.input_transformer {
224 obj["InputTransformer"] = it.clone();
225 }
226 if let Some(ref sp) = t.sqs_parameters {
227 obj["SqsParameters"] = sp.clone();
228 }
229 obj
230}
231
232impl EventBridgeService {
234 fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
235 let body = parse_body(req);
236 validate_required("Name", &body["Name"])?;
237 let name = body["Name"]
238 .as_str()
239 .ok_or_else(|| missing("Name"))?
240 .to_string();
241 validate_string_length("name", &name, 1, 256)?;
242 validate_optional_string_length(
243 "eventSourceName",
244 body["EventSourceName"].as_str(),
245 1,
246 256,
247 )?;
248 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
249 validate_optional_string_length(
250 "kmsKeyIdentifier",
251 body["KmsKeyIdentifier"].as_str(),
252 0,
253 2048,
254 )?;
255
256 if name.contains('/') && !name.starts_with("aws.partner/") {
258 return Err(AwsServiceError::aws_error(
259 StatusCode::BAD_REQUEST,
260 "ValidationException",
261 "Event bus name must not contain '/'.",
262 ));
263 }
264
265 if name.starts_with("aws.partner/") {
267 let event_source = body["EventSourceName"].as_str().unwrap_or("");
268 let state_r = self.state.read();
269 let has_source = state_r.partner_event_sources.contains_key(event_source);
270 drop(state_r);
271 if !has_source {
272 return Err(AwsServiceError::aws_error(
273 StatusCode::BAD_REQUEST,
274 "ResourceNotFoundException",
275 format!("Event source {event_source} does not exist."),
276 ));
277 }
278 }
279
280 let mut state = self.state.write();
281
282 if state.buses.contains_key(&name) {
283 return Err(AwsServiceError::aws_error(
284 StatusCode::BAD_REQUEST,
285 "ResourceAlreadyExistsException",
286 format!("Event bus {name} already exists."),
287 ));
288 }
289
290 let arn = format!(
291 "arn:aws:events:{}:{}:event-bus/{}",
292 req.region, state.account_id, name
293 );
294 let now = Utc::now();
295 let description = body["Description"].as_str().map(|s| s.to_string());
296 let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
297 let dead_letter_config = body.get("DeadLetterConfig").cloned();
298
299 let tags = parse_tags(&body);
300
301 let bus = EventBus {
302 name: name.clone(),
303 arn: arn.clone(),
304 tags,
305 policy: None,
306 description,
307 kms_key_identifier,
308 dead_letter_config,
309 creation_time: now,
310 last_modified_time: now,
311 };
312 state.buses.insert(name, bus);
313
314 Ok(json_resp(json!({ "EventBusArn": arn })))
315 }
316
317 fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
318 let body = parse_body(req);
319 validate_required("Name", &body["Name"])?;
320 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
321 validate_string_length("name", name, 1, 256)?;
322
323 if name == "default" {
324 return Err(AwsServiceError::aws_error(
325 StatusCode::BAD_REQUEST,
326 "ValidationException",
327 format!("Cannot delete event bus {name}."),
328 ));
329 }
330
331 let mut state = self.state.write();
332 state.buses.remove(name);
333 state.rules.retain(|k, _| k.0 != name);
334
335 Ok(json_resp(json!({})))
336 }
337
338 fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
339 let body = parse_body(req);
340 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
341 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
342 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
343 let name_prefix = body["NamePrefix"].as_str();
344 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
345 let offset: usize = match body["NextToken"].as_str() {
346 Some(t) => t.parse().map_err(|_| {
347 AwsServiceError::aws_error(
348 StatusCode::BAD_REQUEST,
349 "InvalidNextTokenException",
350 format!("Invalid NextToken value: '{t}'"),
351 )
352 })?,
353 None => 0,
354 };
355
356 let state = self.state.read();
357 let filtered: Vec<Value> = state
358 .buses
359 .values()
360 .filter(|b| match name_prefix {
361 Some(prefix) => b.name.starts_with(prefix),
362 None => true,
363 })
364 .skip(offset)
365 .take(limit + 1)
366 .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
367 .collect();
368
369 let has_more = filtered.len() > limit;
370 let buses: Vec<Value> = filtered.into_iter().take(limit).collect();
371 let mut resp = json!({ "EventBuses": buses });
372 if has_more {
373 resp["NextToken"] = json!((offset + limit).to_string());
374 }
375
376 Ok(json_resp(resp))
377 }
378
379 fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
380 let body = parse_body(req);
381 validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
382 let name = body["Name"].as_str().unwrap_or("default");
383
384 let state = self.state.read();
385 let bus = state.buses.get(name).ok_or_else(|| {
386 AwsServiceError::aws_error(
387 StatusCode::BAD_REQUEST,
388 "ResourceNotFoundException",
389 format!("Event bus {name} does not exist."),
390 )
391 })?;
392
393 let mut resp = json!({
394 "Name": bus.name,
395 "Arn": bus.arn,
396 "CreationTime": bus.creation_time.timestamp() as f64,
397 "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
398 });
399
400 if let Some(ref policy) = bus.policy {
401 resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
402 }
403 if let Some(ref desc) = bus.description {
404 resp["Description"] = json!(desc);
405 }
406 if let Some(ref kms) = bus.kms_key_identifier {
407 resp["KmsKeyIdentifier"] = json!(kms);
408 }
409 if let Some(ref dlc) = bus.dead_letter_config {
410 resp["DeadLetterConfig"] = dlc.clone();
411 }
412
413 Ok(json_resp(resp))
414 }
415
416 fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
419 let body = parse_body(req);
420 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
421 validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
422 validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
423 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
424 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
425
426 let mut state = self.state.write();
427
428 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
429 AwsServiceError::aws_error(
430 StatusCode::BAD_REQUEST,
431 "ResourceNotFoundException",
432 format!("Event bus {event_bus_name} does not exist."),
433 )
434 })?;
435
436 if let Some(policy_str) = body["Policy"].as_str() {
438 if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
439 bus.policy = Some(policy);
440 return Ok(json_resp(json!({})));
441 }
442 }
443
444 let action = body["Action"].as_str().unwrap_or("");
446 let principal = body["Principal"].as_str().unwrap_or("");
447 let statement_id = body["StatementId"].as_str().unwrap_or("");
448
449 if action != "events:PutEvents" {
451 return Err(AwsServiceError::aws_error(
452 StatusCode::BAD_REQUEST,
453 "ValidationException",
454 "Provided value in parameter 'action' is not supported.",
455 ));
456 }
457
458 let statement = json!({
459 "Sid": statement_id,
460 "Effect": "Allow",
461 "Principal": { "AWS": format!("arn:aws:iam::{principal}:root") },
462 "Action": action,
463 "Resource": bus.arn,
464 });
465
466 let policy = bus.policy.get_or_insert_with(|| {
467 json!({
468 "Version": "2012-10-17",
469 "Statement": [],
470 })
471 });
472
473 if let Some(stmts) = policy["Statement"].as_array_mut() {
474 stmts.push(statement);
475 }
476
477 Ok(json_resp(json!({})))
478 }
479
480 fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
481 let body = parse_body(req);
482 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
483 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
484 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
485 let statement_id = body["StatementId"].as_str().unwrap_or("");
486 let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
487
488 let mut state = self.state.write();
489
490 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
491 AwsServiceError::aws_error(
492 StatusCode::BAD_REQUEST,
493 "ResourceNotFoundException",
494 format!("Event bus {event_bus_name} does not exist."),
495 )
496 })?;
497
498 if remove_all {
499 bus.policy = None;
500 return Ok(json_resp(json!({})));
501 }
502
503 let policy = bus.policy.as_mut().ok_or_else(|| {
504 AwsServiceError::aws_error(
505 StatusCode::BAD_REQUEST,
506 "ResourceNotFoundException",
507 "EventBus does not have a policy.",
508 )
509 })?;
510
511 if let Some(stmts) = policy["Statement"].as_array_mut() {
512 let before = stmts.len();
513 stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
514 if stmts.len() == before {
515 return Err(AwsServiceError::aws_error(
516 StatusCode::BAD_REQUEST,
517 "ResourceNotFoundException",
518 "Statement with the provided id does not exist.",
519 ));
520 }
521 if stmts.is_empty() {
522 bus.policy = None;
523 }
524 }
525
526 Ok(json_resp(json!({})))
527 }
528
529 fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
532 let body = parse_body(req);
533 validate_required("Name", &body["Name"])?;
534 let name = body["Name"]
535 .as_str()
536 .ok_or_else(|| missing("Name"))?
537 .to_string();
538 validate_string_length("name", &name, 1, 64)?;
539 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
540 validate_optional_string_length(
541 "scheduleExpression",
542 body["ScheduleExpression"].as_str(),
543 0,
544 256,
545 )?;
546 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
547 validate_optional_enum(
548 "state",
549 body["State"].as_str(),
550 &[
551 "ENABLED",
552 "DISABLED",
553 "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
554 ],
555 )?;
556 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
557 validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
558
559 let raw_bus = body["EventBusName"]
560 .as_str()
561 .unwrap_or("default")
562 .to_string();
563
564 let mut state = self.state.write();
565 let event_bus_name = state.resolve_bus_name(&raw_bus);
566
567 let event_pattern = body["EventPattern"].as_str().and_then(|s| {
568 if s.is_empty() {
569 None
570 } else {
571 Some(s.to_string())
572 }
573 });
574 let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
575 if s.is_empty() {
576 None
577 } else {
578 Some(s.to_string())
579 }
580 });
581 let description = body["Description"].as_str().map(|s| s.to_string());
582 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
583 let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
584
585 if schedule_expression.is_some() && event_bus_name != "default" {
587 return Err(AwsServiceError::aws_error(
588 StatusCode::BAD_REQUEST,
589 "ValidationException",
590 "ScheduleExpression is supported only on the default event bus.",
591 ));
592 }
593
594 if !state.buses.contains_key(&event_bus_name) {
595 return Err(AwsServiceError::aws_error(
596 StatusCode::BAD_REQUEST,
597 "ResourceNotFoundException",
598 format!("Event bus {event_bus_name} does not exist."),
599 ));
600 }
601
602 let arn = if event_bus_name == "default" {
603 format!(
604 "arn:aws:events:{}:{}:rule/{}",
605 req.region, state.account_id, name
606 )
607 } else {
608 format!(
609 "arn:aws:events:{}:{}:rule/{}/{}",
610 req.region, state.account_id, event_bus_name, name
611 )
612 };
613
614 let key = (event_bus_name.clone(), name.clone());
615 let targets = state
616 .rules
617 .get(&key)
618 .map(|r| r.targets.clone())
619 .unwrap_or_default();
620
621 let tags = parse_tags(&body);
622
623 let rule = EventRule {
624 name: name.clone(),
625 arn: arn.clone(),
626 event_bus_name,
627 event_pattern,
628 schedule_expression,
629 state: rule_state,
630 description,
631 role_arn,
632 managed_by: None,
633 created_by: None,
634 targets,
635 tags,
636 last_fired: None,
637 };
638
639 state.rules.insert(key, rule);
640 Ok(json_resp(json!({ "RuleArn": arn })))
641 }
642
643 fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
644 let body = parse_body(req);
645 validate_required("Name", &body["Name"])?;
646 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
647 validate_string_length("name", name, 1, 64)?;
648 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
649 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
650
651 let mut state = self.state.write();
652 let bus_name = state.resolve_bus_name(event_bus_name);
653 let key = (bus_name, name.to_string());
654
655 if let Some(rule) = state.rules.get(&key) {
657 if !rule.targets.is_empty() {
658 return Err(AwsServiceError::aws_error(
659 StatusCode::BAD_REQUEST,
660 "ValidationException",
661 "Rule can't be deleted since it has targets.",
662 ));
663 }
664 }
665
666 state.rules.remove(&key);
667 Ok(json_resp(json!({})))
668 }
669
670 fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
671 let body = parse_body(req);
672 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
673 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
674 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
675 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
676 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
677 let name_prefix = body["NamePrefix"].as_str();
678 let limit = body["Limit"].as_u64().map(|n| n as usize);
679 let next_token = body["NextToken"].as_str();
680
681 let state = self.state.read();
682 let bus_name = state.resolve_bus_name(event_bus_name);
683
684 let mut rules: Vec<&EventRule> = state
685 .rules
686 .values()
687 .filter(|r| r.event_bus_name == bus_name)
688 .filter(|r| match name_prefix {
689 Some(prefix) => r.name.starts_with(prefix),
690 None => true,
691 })
692 .collect();
693 rules.sort_by(|a, b| a.name.cmp(&b.name));
694
695 let start = next_token
697 .and_then(|t| t.parse::<usize>().ok())
698 .unwrap_or(0)
699 .min(rules.len());
700 let rules_slice = &rules[start..];
701
702 let (page, new_next_token) = if let Some(lim) = limit {
703 if rules_slice.len() > lim {
704 (&rules_slice[..lim], Some((start + lim).to_string()))
705 } else {
706 (rules_slice, None)
707 }
708 } else {
709 (rules_slice, None)
710 };
711
712 let rules_json: Vec<Value> = page
713 .iter()
714 .map(|r| {
715 let mut obj = json!({
716 "Name": r.name,
717 "Arn": r.arn,
718 "EventBusName": r.event_bus_name,
719 "State": r.state,
720 });
721 if let Some(ref desc) = r.description {
722 obj["Description"] = json!(desc);
723 }
724 if let Some(ref ep) = r.event_pattern {
725 obj["EventPattern"] = json!(ep);
726 }
727 if let Some(ref se) = r.schedule_expression {
728 obj["ScheduleExpression"] = json!(se);
729 }
730 if let Some(ref mb) = r.managed_by {
731 obj["ManagedBy"] = json!(mb);
732 }
733 obj
734 })
735 .collect();
736
737 let mut resp = json!({ "Rules": rules_json });
738 if let Some(token) = new_next_token {
739 resp["NextToken"] = json!(token);
740 }
741
742 Ok(json_resp(resp))
743 }
744
745 fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
746 let body = parse_body(req);
747 validate_required("Name", &body["Name"])?;
748 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
749 validate_string_length("name", name, 1, 64)?;
750 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
751 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
752
753 let state = self.state.read();
754 let bus_name = state.resolve_bus_name(event_bus_name);
755 let key = (bus_name.clone(), name.to_string());
756
757 let rule = state.rules.get(&key).ok_or_else(|| {
758 AwsServiceError::aws_error(
759 StatusCode::BAD_REQUEST,
760 "ResourceNotFoundException",
761 format!("Rule {name} does not exist."),
762 )
763 })?;
764
765 let mut resp = json!({
766 "Name": rule.name,
767 "Arn": rule.arn,
768 "EventBusName": rule.event_bus_name,
769 "State": rule.state,
770 });
771
772 if let Some(ref desc) = rule.description {
773 resp["Description"] = json!(desc);
774 }
775 if let Some(ref ep) = rule.event_pattern {
776 resp["EventPattern"] = json!(ep);
777 }
778 if let Some(ref se) = rule.schedule_expression {
779 resp["ScheduleExpression"] = json!(se);
780 }
781 if let Some(ref role) = rule.role_arn {
782 resp["RoleArn"] = json!(role);
783 }
784 if let Some(ref mb) = rule.managed_by {
785 resp["ManagedBy"] = json!(mb);
786 }
787 if let Some(ref cb) = rule.created_by {
788 resp["CreatedBy"] = json!(cb);
789 }
790 if rule.event_bus_name != "default" && rule.created_by.is_none() {
792 resp["CreatedBy"] = json!(state.account_id);
793 }
794
795 Ok(json_resp(resp))
796 }
797
798 fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
799 let body = parse_body(req);
800 validate_required("Name", &body["Name"])?;
801 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
802 validate_string_length("name", name, 1, 64)?;
803 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
804 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
805
806 let mut state = self.state.write();
807 let bus_name = state.resolve_bus_name(event_bus_name);
808 let key = (bus_name, name.to_string());
809
810 let rule = state.rules.get_mut(&key).ok_or_else(|| {
811 AwsServiceError::aws_error(
812 StatusCode::BAD_REQUEST,
813 "ResourceNotFoundException",
814 format!("Rule {name} does not exist."),
815 )
816 })?;
817
818 rule.state = "ENABLED".to_string();
819 Ok(json_resp(json!({})))
820 }
821
822 fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
823 let body = parse_body(req);
824 validate_required("Name", &body["Name"])?;
825 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
826 validate_string_length("name", name, 1, 64)?;
827 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
828 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
829
830 let mut state = self.state.write();
831 let bus_name = state.resolve_bus_name(event_bus_name);
832 let key = (bus_name, name.to_string());
833
834 let rule = state.rules.get_mut(&key).ok_or_else(|| {
835 AwsServiceError::aws_error(
836 StatusCode::BAD_REQUEST,
837 "ResourceNotFoundException",
838 format!("Rule {name} does not exist."),
839 )
840 })?;
841
842 rule.state = "DISABLED".to_string();
843 Ok(json_resp(json!({})))
844 }
845
846 fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
849 let body = parse_body(req);
850 validate_required("Rule", &body["Rule"])?;
851 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
852 validate_string_length("rule", rule_name, 1, 64)?;
853 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
854 validate_required("Targets", &body["Targets"])?;
855 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
856 let targets = body["Targets"]
857 .as_array()
858 .ok_or_else(|| missing("Targets"))?;
859
860 for target in targets {
862 let target_id = target["Id"].as_str().unwrap_or("");
863 let target_arn = target["Arn"].as_str().unwrap_or("");
864
865 if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
866 return Err(AwsServiceError::aws_error(
867 StatusCode::BAD_REQUEST,
868 "ValidationException",
869 format!(
870 "Parameter(s) SqsParameters must be specified for target: {target_id}."
871 ),
872 ));
873 }
874
875 if !target_arn.starts_with("arn:") {
877 return Err(AwsServiceError::aws_error(
878 StatusCode::BAD_REQUEST,
879 "ValidationException",
880 format!(
881 "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
882 ),
883 ));
884 }
885 }
886
887 let mut state = self.state.write();
888 let bus_name = state.resolve_bus_name(event_bus_name);
889 let key = (bus_name.clone(), rule_name.to_string());
890
891 let rule = state.rules.get_mut(&key).ok_or_else(|| {
892 AwsServiceError::aws_error(
893 StatusCode::BAD_REQUEST,
894 "ResourceNotFoundException",
895 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
896 )
897 })?;
898
899 for target in targets {
900 let et = parse_target(target);
901 rule.targets.retain(|t| t.id != et.id);
903 rule.targets.push(et);
904 }
905
906 Ok(json_resp(json!({
907 "FailedEntryCount": 0,
908 "FailedEntries": [],
909 })))
910 }
911
912 fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
913 let body = parse_body(req);
914 validate_required("Rule", &body["Rule"])?;
915 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
916 validate_string_length("rule", rule_name, 1, 64)?;
917 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
918 validate_required("Ids", &body["Ids"])?;
919 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
920 let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
921
922 let target_ids: Vec<String> = ids
923 .iter()
924 .filter_map(|v| v.as_str().map(|s| s.to_string()))
925 .collect();
926
927 let mut state = self.state.write();
928 let bus_name = state.resolve_bus_name(event_bus_name);
929 let key = (bus_name.clone(), rule_name.to_string());
930
931 let rule = state.rules.get_mut(&key).ok_or_else(|| {
932 AwsServiceError::aws_error(
933 StatusCode::BAD_REQUEST,
934 "ResourceNotFoundException",
935 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
936 )
937 })?;
938
939 rule.targets.retain(|t| !target_ids.contains(&t.id));
940
941 Ok(json_resp(json!({
942 "FailedEntryCount": 0,
943 "FailedEntries": [],
944 })))
945 }
946
947 fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
948 let body = parse_body(req);
949 validate_required("Rule", &body["Rule"])?;
950 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
951 validate_string_length("rule", rule_name, 1, 64)?;
952 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
953 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
954 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
955 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
956 let limit = body["Limit"].as_u64().map(|n| n as usize);
957 let next_token = body["NextToken"].as_str();
958
959 let state = self.state.read();
960 let bus_name = state.resolve_bus_name(event_bus_name);
961 let key = (bus_name, rule_name.to_string());
962
963 let rule = state.rules.get(&key).ok_or_else(|| {
964 AwsServiceError::aws_error(
965 StatusCode::BAD_REQUEST,
966 "ResourceNotFoundException",
967 format!("Rule {rule_name} does not exist."),
968 )
969 })?;
970
971 let all_targets = &rule.targets;
972 let start = next_token
973 .and_then(|t| t.parse::<usize>().ok())
974 .unwrap_or(0)
975 .min(all_targets.len());
976 let slice = &all_targets[start..];
977
978 let (page, new_next_token) = if let Some(lim) = limit {
979 if slice.len() > lim {
980 (&slice[..lim], Some((start + lim).to_string()))
981 } else {
982 (slice, None)
983 }
984 } else {
985 (slice, None)
986 };
987
988 let targets: Vec<Value> = page.iter().map(target_to_json).collect();
989
990 let mut resp = json!({ "Targets": targets });
991 if let Some(token) = new_next_token {
992 resp["NextToken"] = json!(token);
993 }
994
995 Ok(json_resp(resp))
996 }
997
998 fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
999 let body = parse_body(req);
1000 validate_required("TargetArn", &body["TargetArn"])?;
1001 let target_arn = body["TargetArn"]
1002 .as_str()
1003 .ok_or_else(|| missing("TargetArn"))?;
1004 validate_string_length("targetArn", target_arn, 1, 1600)?;
1005 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1006 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1007 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1008 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1009 let limit = body["Limit"].as_u64().map(|n| n as usize);
1010 let next_token = body["NextToken"].as_str();
1011
1012 let state = self.state.read();
1013 let bus_name = state.resolve_bus_name(event_bus_name);
1014
1015 let mut rule_names: Vec<String> = Vec::new();
1017 for rule in state.rules.values() {
1018 if rule.event_bus_name == bus_name
1019 && rule.targets.iter().any(|t| t.arn == target_arn)
1020 && !rule_names.contains(&rule.name)
1021 {
1022 rule_names.push(rule.name.clone());
1023 }
1024 }
1025 rule_names.sort();
1026
1027 let start = next_token
1028 .and_then(|t| t.parse::<usize>().ok())
1029 .unwrap_or(0)
1030 .min(rule_names.len());
1031 let slice = &rule_names[start..];
1032
1033 let (page, new_next_token) = if let Some(lim) = limit {
1034 if slice.len() > lim {
1035 (&slice[..lim], Some((start + lim).to_string()))
1036 } else {
1037 (slice, None)
1038 }
1039 } else {
1040 (slice, None)
1041 };
1042
1043 let mut resp = json!({ "RuleNames": page });
1044 if let Some(token) = new_next_token {
1045 resp["NextToken"] = json!(token);
1046 }
1047
1048 Ok(json_resp(resp))
1049 }
1050
1051 fn create_partner_event_source(
1054 &self,
1055 req: &AwsRequest,
1056 ) -> Result<AwsResponse, AwsServiceError> {
1057 let body = parse_body(req);
1058 validate_required("Name", &body["Name"])?;
1059 let name = body["Name"]
1060 .as_str()
1061 .ok_or_else(|| missing("Name"))?
1062 .to_string();
1063 validate_string_length("name", &name, 1, 256)?;
1064 validate_required("Account", &body["Account"])?;
1065 let account = body["Account"]
1066 .as_str()
1067 .ok_or_else(|| missing("Account"))?
1068 .to_string();
1069 validate_string_length("account", &account, 12, 12)?;
1070
1071 let mut state = self.state.write();
1072 if state.partner_event_sources.contains_key(&name) {
1073 return Err(AwsServiceError::aws_error(
1074 StatusCode::CONFLICT,
1075 "ResourceAlreadyExistsException",
1076 format!("Partner event source {name} already exists."),
1077 ));
1078 }
1079 let arn = format!(
1080 "arn:aws:events:{}::event-source/aws.partner/{}",
1081 state.region, name
1082 );
1083 let now = Utc::now();
1084 let ps = PartnerEventSource {
1085 name: name.clone(),
1086 arn: arn.clone(),
1087 account,
1088 creation_time: now,
1089 expiration_time: None,
1090 state: "ACTIVE".to_string(),
1091 };
1092 state.partner_event_sources.insert(name.clone(), ps);
1093
1094 Ok(json_resp(json!({ "EventSourceArn": arn })))
1095 }
1096
1097 fn delete_partner_event_source(
1098 &self,
1099 req: &AwsRequest,
1100 ) -> Result<AwsResponse, AwsServiceError> {
1101 let body = parse_body(req);
1102 validate_required("Name", &body["Name"])?;
1103 let name = body["Name"]
1104 .as_str()
1105 .ok_or_else(|| missing("Name"))?
1106 .to_string();
1107 validate_required("Account", &body["Account"])?;
1108 let account = body["Account"]
1109 .as_str()
1110 .ok_or_else(|| missing("Account"))?
1111 .to_string();
1112
1113 let mut state = self.state.write();
1114 match state.partner_event_sources.get(&name) {
1115 Some(ps) if ps.account == account => {
1116 state.partner_event_sources.remove(&name);
1117 }
1118 Some(_) => {
1119 return Err(AwsServiceError::aws_error(
1120 StatusCode::NOT_FOUND,
1121 "ResourceNotFoundException",
1122 format!("Partner event source {name} does not exist for account {account}."),
1123 ));
1124 }
1125 None => {
1126 return Err(AwsServiceError::aws_error(
1127 StatusCode::NOT_FOUND,
1128 "ResourceNotFoundException",
1129 format!("Partner event source {name} does not exist."),
1130 ));
1131 }
1132 }
1133
1134 Ok(json_resp(json!({})))
1135 }
1136
1137 fn describe_partner_event_source(
1138 &self,
1139 req: &AwsRequest,
1140 ) -> Result<AwsResponse, AwsServiceError> {
1141 let body = parse_body(req);
1142 validate_required("Name", &body["Name"])?;
1143 let name = body["Name"]
1144 .as_str()
1145 .ok_or_else(|| missing("Name"))?
1146 .to_string();
1147 validate_string_length("name", &name, 1, 256)?;
1148
1149 let state = self.state.read();
1150 let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1151 AwsServiceError::aws_error(
1152 StatusCode::NOT_FOUND,
1153 "ResourceNotFoundException",
1154 format!("Partner event source {name} does not exist."),
1155 )
1156 })?;
1157
1158 Ok(json_resp(json!({
1159 "Arn": ps.arn,
1160 "Name": ps.name,
1161 })))
1162 }
1163
1164 fn list_partner_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1165 let body = parse_body(req);
1166 let name_prefix = body["NamePrefix"].as_str();
1167 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1168 let offset: usize = body["NextToken"]
1169 .as_str()
1170 .map(|t| t.parse().unwrap_or(0))
1171 .unwrap_or(0);
1172
1173 let state = self.state.read();
1174 let filtered: Vec<Value> = state
1175 .partner_event_sources
1176 .values()
1177 .filter(|ps| match name_prefix {
1178 Some(prefix) => ps.name.starts_with(prefix),
1179 None => true,
1180 })
1181 .skip(offset)
1182 .take(limit + 1)
1183 .map(|ps| {
1184 json!({
1185 "Arn": ps.arn,
1186 "Name": ps.name,
1187 "CreationTime": ps.creation_time.timestamp() as f64,
1188 })
1189 })
1190 .collect();
1191
1192 let has_more = filtered.len() > limit;
1193 let sources: Vec<Value> = filtered.into_iter().take(limit).collect();
1194 let mut resp = json!({ "PartnerEventSources": sources });
1195 if has_more {
1196 resp["NextToken"] = json!((offset + limit).to_string());
1197 }
1198
1199 Ok(json_resp(resp))
1200 }
1201
1202 fn list_partner_event_source_accounts(
1203 &self,
1204 req: &AwsRequest,
1205 ) -> Result<AwsResponse, AwsServiceError> {
1206 let body = parse_body(req);
1207 validate_required("EventSourceName", &body["EventSourceName"])?;
1208 let event_source_name = body["EventSourceName"]
1209 .as_str()
1210 .ok_or_else(|| missing("EventSourceName"))?;
1211
1212 let state = self.state.read();
1213 let accounts: Vec<Value> = state
1214 .partner_event_sources
1215 .values()
1216 .filter(|ps| ps.name == event_source_name)
1217 .map(|ps| json!({ "Account": ps.account }))
1218 .collect();
1219
1220 Ok(json_resp(json!({
1221 "PartnerEventSourceAccounts": accounts
1222 })))
1223 }
1224
1225 fn activate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1226 let body = parse_body(req);
1227 validate_required("Name", &body["Name"])?;
1228 let name = body["Name"]
1229 .as_str()
1230 .ok_or_else(|| missing("Name"))?
1231 .to_string();
1232
1233 let mut state = self.state.write();
1234 let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1235 AwsServiceError::aws_error(
1236 StatusCode::NOT_FOUND,
1237 "ResourceNotFoundException",
1238 format!("Event source {name} does not exist."),
1239 )
1240 })?;
1241 ps.state = "ACTIVE".to_string();
1242
1243 Ok(json_resp(json!({})))
1244 }
1245
1246 fn deactivate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1247 let body = parse_body(req);
1248 validate_required("Name", &body["Name"])?;
1249 let name = body["Name"]
1250 .as_str()
1251 .ok_or_else(|| missing("Name"))?
1252 .to_string();
1253
1254 let mut state = self.state.write();
1255 let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1256 AwsServiceError::aws_error(
1257 StatusCode::NOT_FOUND,
1258 "ResourceNotFoundException",
1259 format!("Event source {name} does not exist."),
1260 )
1261 })?;
1262 ps.state = "INACTIVE".to_string();
1263
1264 Ok(json_resp(json!({})))
1265 }
1266
1267 fn describe_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1268 let body = parse_body(req);
1269 validate_required("Name", &body["Name"])?;
1270 let name = body["Name"]
1271 .as_str()
1272 .ok_or_else(|| missing("Name"))?
1273 .to_string();
1274
1275 let state = self.state.read();
1276 let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1277 AwsServiceError::aws_error(
1278 StatusCode::NOT_FOUND,
1279 "ResourceNotFoundException",
1280 format!("Event source {name} does not exist."),
1281 )
1282 })?;
1283
1284 Ok(json_resp(json!({
1285 "Arn": ps.arn,
1286 "Name": ps.name,
1287 "CreatedBy": ps.account,
1288 "CreationTime": ps.creation_time.timestamp() as f64,
1289 "State": ps.state,
1290 })))
1291 }
1292
1293 fn list_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1294 let body = parse_body(req);
1295 let name_prefix = body["NamePrefix"].as_str();
1296 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1297 let offset: usize = body["NextToken"]
1298 .as_str()
1299 .map(|t| t.parse().unwrap_or(0))
1300 .unwrap_or(0);
1301
1302 let state = self.state.read();
1303 let filtered: Vec<Value> = state
1304 .partner_event_sources
1305 .values()
1306 .filter(|ps| match name_prefix {
1307 Some(prefix) => ps.name.starts_with(prefix),
1308 None => true,
1309 })
1310 .skip(offset)
1311 .take(limit + 1)
1312 .map(|ps| {
1313 json!({
1314 "Arn": ps.arn,
1315 "Name": ps.name,
1316 "CreatedBy": ps.account,
1317 "CreationTime": ps.creation_time.timestamp() as f64,
1318 "State": ps.state,
1319 })
1320 })
1321 .collect();
1322
1323 let has_more = filtered.len() > limit;
1324 let sources: Vec<Value> = filtered.into_iter().take(limit).collect();
1325 let mut resp = json!({ "EventSources": sources });
1326 if has_more {
1327 resp["NextToken"] = json!((offset + limit).to_string());
1328 }
1329
1330 Ok(json_resp(resp))
1331 }
1332
1333 fn put_partner_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1334 let body = parse_body(req);
1335 validate_required("Entries", &body["Entries"])?;
1336 let entries = body["Entries"]
1337 .as_array()
1338 .ok_or_else(|| missing("Entries"))?;
1339
1340 let mut result_entries = Vec::new();
1341 for _entry in entries {
1342 let event_id = uuid::Uuid::new_v4().to_string();
1343 result_entries.push(json!({ "EventId": event_id }));
1344 }
1345
1346 Ok(json_resp(json!({
1347 "FailedEntryCount": 0,
1348 "Entries": result_entries,
1349 })))
1350 }
1351
1352 fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1355 let body = parse_body(req);
1356 validate_required("EventPattern", &body["EventPattern"])?;
1357 validate_required("Event", &body["Event"])?;
1358 let event_pattern = body["EventPattern"]
1359 .as_str()
1360 .ok_or_else(|| missing("EventPattern"))?;
1361 let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1362
1363 let event: Value = serde_json::from_str(event_str).map_err(|_| {
1365 AwsServiceError::aws_error(
1366 StatusCode::BAD_REQUEST,
1367 "InvalidEventPatternException",
1368 "Event is not valid JSON.",
1369 )
1370 })?;
1371
1372 let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1374 AwsServiceError::aws_error(
1375 StatusCode::BAD_REQUEST,
1376 "InvalidEventPatternException",
1377 "Event pattern is not valid JSON.",
1378 )
1379 })?;
1380
1381 let source = event["source"].as_str().unwrap_or("");
1382 let detail_type = event["detail-type"].as_str().unwrap_or("");
1383 let detail = event
1384 .get("detail")
1385 .map(|v| serde_json::to_string(v).unwrap_or_default())
1386 .unwrap_or_else(|| "{}".to_string());
1387 let account = event["account"].as_str().unwrap_or("");
1388 let region = event["region"].as_str().unwrap_or("");
1389 let resources: Vec<String> = event["resources"]
1390 .as_array()
1391 .map(|arr| {
1392 arr.iter()
1393 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1394 .collect()
1395 })
1396 .unwrap_or_default();
1397
1398 let result = matches_pattern(
1399 Some(event_pattern),
1400 source,
1401 detail_type,
1402 &detail,
1403 account,
1404 region,
1405 &resources,
1406 );
1407
1408 Ok(json_resp(json!({ "Result": result })))
1409 }
1410
1411 fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1414 let body = parse_body(req);
1415 let name = body["Name"].as_str().unwrap_or("default");
1416
1417 let mut state = self.state.write();
1418 let bus = state.buses.get_mut(name).ok_or_else(|| {
1419 AwsServiceError::aws_error(
1420 StatusCode::BAD_REQUEST,
1421 "ResourceNotFoundException",
1422 format!("Event bus {name} does not exist."),
1423 )
1424 })?;
1425
1426 if let Some(desc) = body["Description"].as_str() {
1427 bus.description = Some(desc.to_string());
1428 }
1429 if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1430 bus.kms_key_identifier = Some(kms.to_string());
1431 }
1432 if let Some(dlc) = body.get("DeadLetterConfig") {
1433 bus.dead_letter_config = Some(dlc.clone());
1434 }
1435 bus.last_modified_time = Utc::now();
1436
1437 let arn = bus.arn.clone();
1438 let bus_name = bus.name.clone();
1439
1440 Ok(json_resp(json!({
1441 "Arn": arn,
1442 "Name": bus_name,
1443 })))
1444 }
1445
1446 fn create_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1449 let body = parse_body(req);
1450 validate_required("Name", &body["Name"])?;
1451 let name = body["Name"]
1452 .as_str()
1453 .ok_or_else(|| missing("Name"))?
1454 .to_string();
1455 validate_string_length("name", &name, 1, 64)?;
1456 validate_required("RoutingConfig", &body["RoutingConfig"])?;
1457 validate_required("EventBuses", &body["EventBuses"])?;
1458
1459 let description = body["Description"].as_str().map(|s| s.to_string());
1460 let routing_config = body["RoutingConfig"].clone();
1461 let replication_config = body.get("ReplicationConfig").cloned();
1462 let event_buses = body["EventBuses"].as_array().cloned().unwrap_or_default();
1463 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
1464
1465 let mut state = self.state.write();
1466 if state.endpoints.contains_key(&name) {
1467 return Err(AwsServiceError::aws_error(
1468 StatusCode::CONFLICT,
1469 "ResourceAlreadyExistsException",
1470 format!("Endpoint {name} already exists."),
1471 ));
1472 }
1473
1474 let endpoint_id = format!("{}.abc123", name);
1475 let arn = format!(
1476 "arn:aws:events:{}:{}:endpoint/{}",
1477 req.region, state.account_id, name
1478 );
1479 let endpoint_url = format!(
1480 "https://{}.endpoint.events.{}.amazonaws.com",
1481 endpoint_id, req.region
1482 );
1483 let now = Utc::now();
1484
1485 let endpoint = Endpoint {
1486 name: name.clone(),
1487 arn: arn.clone(),
1488 endpoint_id: endpoint_id.clone(),
1489 endpoint_url: Some(endpoint_url),
1490 description,
1491 routing_config: routing_config.clone(),
1492 replication_config: replication_config.clone(),
1493 event_buses: event_buses.clone(),
1494 role_arn: role_arn.clone(),
1495 state: "ACTIVE".to_string(),
1496 creation_time: now,
1497 last_modified_time: now,
1498 };
1499 state.endpoints.insert(name.clone(), endpoint);
1500
1501 let mut resp = json!({
1502 "Name": name,
1503 "Arn": arn,
1504 "State": "ACTIVE",
1505 "RoutingConfig": routing_config,
1506 "EventBuses": event_buses,
1507 });
1508 if let Some(ref rc) = replication_config {
1509 resp["ReplicationConfig"] = rc.clone();
1510 }
1511 if let Some(ref ra) = role_arn {
1512 resp["RoleArn"] = json!(ra);
1513 }
1514
1515 Ok(json_resp(resp))
1516 }
1517
1518 fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1519 let body = parse_body(req);
1520 validate_required("Name", &body["Name"])?;
1521 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1522
1523 let mut state = self.state.write();
1524 state.endpoints.remove(name).ok_or_else(|| {
1525 AwsServiceError::aws_error(
1526 StatusCode::BAD_REQUEST,
1527 "ResourceNotFoundException",
1528 format!("Endpoint '{name}' does not exist."),
1529 )
1530 })?;
1531
1532 Ok(json_resp(json!({})))
1533 }
1534
1535 fn describe_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1536 let body = parse_body(req);
1537 validate_required("Name", &body["Name"])?;
1538 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1539
1540 let state = self.state.read();
1541 let ep = state.endpoints.get(name).ok_or_else(|| {
1542 AwsServiceError::aws_error(
1543 StatusCode::BAD_REQUEST,
1544 "ResourceNotFoundException",
1545 format!("Endpoint '{name}' does not exist."),
1546 )
1547 })?;
1548
1549 let mut resp = json!({
1550 "Name": ep.name,
1551 "Arn": ep.arn,
1552 "EndpointId": ep.endpoint_id,
1553 "State": ep.state,
1554 "RoutingConfig": ep.routing_config,
1555 "EventBuses": ep.event_buses,
1556 "CreationTime": ep.creation_time.timestamp() as f64,
1557 "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1558 });
1559 if let Some(ref url) = ep.endpoint_url {
1560 resp["EndpointUrl"] = json!(url);
1561 }
1562 if let Some(ref desc) = ep.description {
1563 resp["Description"] = json!(desc);
1564 }
1565 if let Some(ref rc) = ep.replication_config {
1566 resp["ReplicationConfig"] = rc.clone();
1567 }
1568 if let Some(ref ra) = ep.role_arn {
1569 resp["RoleArn"] = json!(ra);
1570 }
1571
1572 Ok(json_resp(resp))
1573 }
1574
1575 fn list_endpoints(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1576 let body = parse_body(req);
1577 let name_prefix = body["NamePrefix"].as_str();
1578 let limit = body["MaxResults"].as_i64().unwrap_or(100) as usize;
1579 let offset: usize = body["NextToken"]
1580 .as_str()
1581 .map(|t| t.parse().unwrap_or(0))
1582 .unwrap_or(0);
1583
1584 let state = self.state.read();
1585 let filtered: Vec<Value> = state
1586 .endpoints
1587 .values()
1588 .filter(|ep| match name_prefix {
1589 Some(prefix) => ep.name.starts_with(prefix),
1590 None => true,
1591 })
1592 .skip(offset)
1593 .take(limit + 1)
1594 .map(|ep| {
1595 let mut obj = json!({
1596 "Name": ep.name,
1597 "Arn": ep.arn,
1598 "EndpointId": ep.endpoint_id,
1599 "State": ep.state,
1600 "RoutingConfig": ep.routing_config,
1601 "EventBuses": ep.event_buses,
1602 "CreationTime": ep.creation_time.timestamp() as f64,
1603 "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1604 });
1605 if let Some(ref url) = ep.endpoint_url {
1606 obj["EndpointUrl"] = json!(url);
1607 }
1608 obj
1609 })
1610 .collect();
1611
1612 let has_more = filtered.len() > limit;
1613 let endpoints: Vec<Value> = filtered.into_iter().take(limit).collect();
1614 let mut resp = json!({ "Endpoints": endpoints });
1615 if has_more {
1616 resp["NextToken"] = json!((offset + limit).to_string());
1617 }
1618
1619 Ok(json_resp(resp))
1620 }
1621
1622 fn update_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1623 let body = parse_body(req);
1624 validate_required("Name", &body["Name"])?;
1625 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1626
1627 let mut state = self.state.write();
1628 let ep = state.endpoints.get_mut(name).ok_or_else(|| {
1629 AwsServiceError::aws_error(
1630 StatusCode::BAD_REQUEST,
1631 "ResourceNotFoundException",
1632 format!("Endpoint '{name}' does not exist."),
1633 )
1634 })?;
1635
1636 if let Some(desc) = body["Description"].as_str() {
1637 ep.description = Some(desc.to_string());
1638 }
1639 if !body["RoutingConfig"].is_null() {
1640 ep.routing_config = body["RoutingConfig"].clone();
1641 }
1642 if let Some(rc) = body.get("ReplicationConfig") {
1643 ep.replication_config = Some(rc.clone());
1644 }
1645 if let Some(buses) = body["EventBuses"].as_array() {
1646 ep.event_buses = buses.clone();
1647 }
1648 if let Some(ra) = body["RoleArn"].as_str() {
1649 ep.role_arn = Some(ra.to_string());
1650 }
1651 ep.last_modified_time = Utc::now();
1652
1653 let resp = json!({
1654 "Name": ep.name,
1655 "Arn": ep.arn,
1656 "EndpointId": ep.endpoint_id,
1657 "State": ep.state,
1658 "RoutingConfig": ep.routing_config,
1659 "EventBuses": ep.event_buses,
1660 });
1661
1662 Ok(json_resp(resp))
1663 }
1664
1665 fn deauthorize_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1668 let body = parse_body(req);
1669 validate_required("Name", &body["Name"])?;
1670 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1671 validate_string_length("name", name, 1, 64)?;
1672
1673 let mut state = self.state.write();
1674 let conn = state.connections.get_mut(name).ok_or_else(|| {
1675 AwsServiceError::aws_error(
1676 StatusCode::BAD_REQUEST,
1677 "ResourceNotFoundException",
1678 format!("Connection '{name}' does not exist."),
1679 )
1680 })?;
1681
1682 conn.connection_state = "DEAUTHORIZING".to_string();
1683 conn.last_modified_time = Utc::now();
1684
1685 let resp = json!({
1686 "ConnectionArn": conn.arn,
1687 "ConnectionState": conn.connection_state,
1688 "CreationTime": conn.creation_time.timestamp() as f64,
1689 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
1690 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
1691 });
1692
1693 Ok(json_resp(resp))
1694 }
1695
1696 fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1699 let body = parse_body(req);
1700 validate_required("Entries", &body["Entries"])?;
1701 validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1702 let entries = body["Entries"]
1703 .as_array()
1704 .ok_or_else(|| missing("Entries"))?;
1705
1706 if entries.is_empty() {
1708 return Err(AwsServiceError::aws_error(
1709 StatusCode::BAD_REQUEST,
1710 "ValidationException",
1711 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1712 ));
1713 }
1714 if entries.len() > 10 {
1715 return Err(AwsServiceError::aws_error(
1716 StatusCode::BAD_REQUEST,
1717 "ValidationException",
1718 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1719 ));
1720 }
1721
1722 let mut state = self.state.write();
1723 let mut result_entries = Vec::new();
1724 let mut events_to_deliver = Vec::new();
1725 let mut failed_count = 0;
1726
1727 for entry in entries {
1728 let source = entry["Source"].as_str().unwrap_or("").to_string();
1729 let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1730 let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1731
1732 if source.is_empty() {
1734 failed_count += 1;
1735 result_entries.push(json!({
1736 "ErrorCode": "InvalidArgument",
1737 "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
1738 }));
1739 continue;
1740 }
1741 if detail_type.is_empty() {
1742 failed_count += 1;
1743 result_entries.push(json!({
1744 "ErrorCode": "InvalidArgument",
1745 "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
1746 }));
1747 continue;
1748 }
1749 if detail.is_empty() {
1750 failed_count += 1;
1751 result_entries.push(json!({
1752 "ErrorCode": "InvalidArgument",
1753 "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
1754 }));
1755 continue;
1756 }
1757
1758 if serde_json::from_str::<Value>(&detail).is_err() {
1760 failed_count += 1;
1761 result_entries.push(json!({
1762 "ErrorCode": "MalformedDetail",
1763 "ErrorMessage": "Detail is malformed.",
1764 }));
1765 continue;
1766 }
1767
1768 let event_id = uuid::Uuid::new_v4().to_string();
1769 let raw_bus = entry["EventBusName"]
1770 .as_str()
1771 .unwrap_or("default")
1772 .to_string();
1773 let event_bus_name = state.resolve_bus_name(&raw_bus);
1774 let time = if let Some(s) = entry["Time"].as_str() {
1775 DateTime::parse_from_rfc3339(s)
1776 .map(|dt| dt.with_timezone(&Utc))
1777 .unwrap_or_else(|_| Utc::now())
1778 } else if let Some(ts) = entry["Time"].as_f64() {
1779 DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
1780 .unwrap_or_else(Utc::now)
1781 } else if let Some(ts) = entry["Time"].as_i64() {
1782 DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now)
1783 } else {
1784 Utc::now()
1785 };
1786 let resources: Vec<String> = entry["Resources"]
1787 .as_array()
1788 .map(|arr| {
1789 arr.iter()
1790 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1791 .collect()
1792 })
1793 .unwrap_or_default();
1794
1795 let event = PutEvent {
1796 event_id: event_id.clone(),
1797 source: source.clone(),
1798 detail_type: detail_type.clone(),
1799 detail: detail.clone(),
1800 event_bus_name: event_bus_name.clone(),
1801 time,
1802 resources: resources.clone(),
1803 };
1804
1805 let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
1807 for akey in archive_keys {
1808 let (archive_bus, archive_pattern, archive_enabled) = {
1809 let a = &state.archives[&akey];
1810 (
1811 state.resolve_bus_name(&a.event_source_arn),
1812 a.event_pattern.clone(),
1813 a.state == "ENABLED",
1814 )
1815 };
1816 if archive_bus == event_bus_name && archive_enabled {
1817 let pattern_matches = matches_pattern(
1819 archive_pattern.as_deref(),
1820 &source,
1821 &detail_type,
1822 &detail,
1823 &req.account_id,
1824 &req.region,
1825 &resources,
1826 );
1827 if pattern_matches {
1828 if let Some(archive) = state.archives.get_mut(&akey) {
1829 archive.event_count += 1;
1830 archive.size_bytes += detail.len() as i64;
1831 archive.events.push(event.clone());
1832 }
1833 }
1834 }
1835 }
1836
1837 state.events.push(event);
1838
1839 let matching_targets: Vec<EventTarget> = state
1841 .rules
1842 .values()
1843 .filter(|r| {
1844 r.event_bus_name == event_bus_name
1845 && r.state == "ENABLED"
1846 && matches_pattern(
1847 r.event_pattern.as_deref(),
1848 &source,
1849 &detail_type,
1850 &detail,
1851 &req.account_id,
1852 &req.region,
1853 &resources,
1854 )
1855 })
1856 .flat_map(|r| r.targets.clone())
1857 .collect();
1858
1859 if !matching_targets.is_empty() {
1860 events_to_deliver.push((
1861 event_id.clone(),
1862 source,
1863 detail_type,
1864 detail,
1865 time,
1866 resources,
1867 matching_targets,
1868 ));
1869 }
1870
1871 result_entries.push(json!({ "EventId": event_id }));
1872 }
1873
1874 drop(state);
1876
1877 for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1879 let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1880 let event_json = json!({
1881 "version": "0",
1882 "id": event_id,
1883 "source": source,
1884 "account": req.account_id,
1885 "detail-type": detail_type,
1886 "detail": detail_value,
1887 "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1888 "region": req.region,
1889 "resources": resources,
1890 });
1891 let event_str = event_json.to_string();
1892
1893 for target in targets {
1894 let arn = &target.arn;
1895 let body_str = if let Some(ref transformer) = target.input_transformer {
1897 apply_input_transformer(transformer, &event_json)
1898 } else if let Some(ref input) = target.input {
1899 input.clone()
1900 } else if let Some(ref input_path) = target.input_path {
1901 resolve_json_path(&event_json, input_path)
1902 .map(|v| v.to_string())
1903 .unwrap_or_else(|| event_str.clone())
1904 } else {
1905 event_str.clone()
1906 };
1907
1908 if arn.contains(":sqs:") {
1909 let group_id = target
1911 .sqs_parameters
1912 .as_ref()
1913 .and_then(|p| p["MessageGroupId"].as_str())
1914 .map(|s| s.to_string());
1915 if group_id.is_some() {
1916 self.delivery.send_to_sqs_with_attrs(
1920 arn,
1921 &body_str,
1922 &HashMap::new(),
1923 group_id.as_deref(),
1924 None,
1925 );
1926 } else {
1927 self.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
1928 }
1929 } else if arn.contains(":sns:") {
1930 self.delivery
1931 .publish_to_sns(arn, &body_str, Some(&detail_type));
1932 } else if arn.contains(":lambda:") {
1933 tracing::info!(
1934 function_arn = %arn,
1935 payload = %body_str,
1936 "EventBridge delivering to Lambda function"
1937 );
1938 let now = Utc::now();
1939 let mut state = self.state.write();
1940 state
1941 .lambda_invocations
1942 .push(crate::state::LambdaInvocation {
1943 function_arn: arn.clone(),
1944 payload: body_str.clone(),
1945 timestamp: now,
1946 });
1947 drop(state);
1948 if let Some(ref ls) = self.lambda_state {
1950 ls.write().invocations.push(LambdaInvocation {
1951 function_arn: arn.clone(),
1952 payload: body_str.clone(),
1953 timestamp: now,
1954 source: "aws:events".to_string(),
1955 });
1956 }
1957 } else if arn.contains(":logs:") {
1958 tracing::info!(
1959 log_group_arn = %arn,
1960 payload = %body_str,
1961 "EventBridge delivering to CloudWatch Logs"
1962 );
1963 let now = Utc::now();
1964 let mut state = self.state.write();
1965 state.log_deliveries.push(crate::state::LogDelivery {
1966 log_group_arn: arn.clone(),
1967 payload: body_str.clone(),
1968 timestamp: now,
1969 });
1970 drop(state);
1971 if let Some(ref log_state) = self.logs_state {
1973 deliver_to_logs(log_state, arn, &body_str, now);
1974 }
1975 } else if arn.contains(":states:") {
1976 tracing::info!(
1977 state_machine_arn = %arn,
1978 payload = %body_str,
1979 "EventBridge delivering to Step Functions (stub)"
1980 );
1981 let mut state = self.state.write();
1982 state
1983 .step_function_executions
1984 .push(crate::state::StepFunctionExecution {
1985 state_machine_arn: arn.clone(),
1986 payload: body_str.clone(),
1987 timestamp: Utc::now(),
1988 });
1989 } else if arn.starts_with("https://") || arn.starts_with("http://") {
1990 let url = arn.clone();
1992 let payload = body_str.clone();
1993 tokio::spawn(async move {
1994 let client = reqwest::Client::new();
1995 let result = client
1996 .post(&url)
1997 .header("Content-Type", "application/json")
1998 .body(payload)
1999 .send()
2000 .await;
2001 if let Err(e) = result {
2002 tracing::warn!(
2003 endpoint = %url,
2004 error = %e,
2005 "EventBridge HTTP target delivery failed"
2006 );
2007 }
2008 });
2009 }
2010 }
2011 }
2012
2013 let resp = json!({
2014 "FailedEntryCount": failed_count,
2015 "Entries": result_entries,
2016 });
2017
2018 Ok(json_resp(resp))
2019 }
2020
2021 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2024 let body = parse_body(req);
2025 validate_required("ResourceARN", &body["ResourceARN"])?;
2026 let arn = body["ResourceARN"]
2027 .as_str()
2028 .ok_or_else(|| missing("ResourceARN"))?;
2029 validate_string_length("resourceARN", arn, 1, 1600)?;
2030 validate_required("Tags", &body["Tags"])?;
2031 let tags = body["Tags"].as_array().ok_or_else(|| missing("Tags"))?;
2032
2033 let mut state = self.state.write();
2034
2035 let tag_map = find_tags_mut(&mut state, arn)?;
2036
2037 for tag in tags {
2038 if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
2039 tag_map.insert(key.to_string(), val.to_string());
2040 }
2041 }
2042
2043 Ok(json_resp(json!({})))
2044 }
2045
2046 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2047 let body = parse_body(req);
2048 validate_required("ResourceARN", &body["ResourceARN"])?;
2049 let arn = body["ResourceARN"]
2050 .as_str()
2051 .ok_or_else(|| missing("ResourceARN"))?;
2052 validate_string_length("resourceARN", arn, 1, 1600)?;
2053 validate_required("TagKeys", &body["TagKeys"])?;
2054 let tag_keys = body["TagKeys"]
2055 .as_array()
2056 .ok_or_else(|| missing("TagKeys"))?;
2057
2058 let mut state = self.state.write();
2059 let tag_map = find_tags_mut(&mut state, arn)?;
2060
2061 for key in tag_keys {
2062 if let Some(k) = key.as_str() {
2063 tag_map.remove(k);
2064 }
2065 }
2066
2067 Ok(json_resp(json!({})))
2068 }
2069
2070 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2071 let body = parse_body(req);
2072 validate_required("ResourceARN", &body["ResourceARN"])?;
2073 let arn = body["ResourceARN"]
2074 .as_str()
2075 .ok_or_else(|| missing("ResourceARN"))?;
2076 validate_string_length("resourceARN", arn, 1, 1600)?;
2077
2078 let state = self.state.read();
2079 let tag_map = find_tags(&state, arn)?;
2080
2081 let tags: Vec<Value> = tag_map
2082 .iter()
2083 .map(|(k, v)| json!({"Key": k, "Value": v}))
2084 .collect();
2085
2086 Ok(json_resp(json!({ "Tags": tags })))
2087 }
2088
2089 fn create_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2092 let body = parse_body(req);
2093 validate_required("ArchiveName", &body["ArchiveName"])?;
2094 let name = body["ArchiveName"]
2095 .as_str()
2096 .ok_or_else(|| missing("ArchiveName"))?
2097 .to_string();
2098 validate_string_length("archiveName", &name, 1, 48)?;
2099 validate_required("EventSourceArn", &body["EventSourceArn"])?;
2100 let event_source_arn = body["EventSourceArn"]
2101 .as_str()
2102 .ok_or_else(|| missing("EventSourceArn"))?
2103 .to_string();
2104 validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2105 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2106 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2107 if let Some(rd) = body["RetentionDays"].as_i64() {
2108 validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2109 }
2110 let description = body["Description"].as_str().map(|s| s.to_string());
2111 let event_pattern = body["EventPattern"].as_str().map(|s| s.to_string());
2112 let retention_days = body["RetentionDays"].as_i64().unwrap_or(0);
2113
2114 if let Some(ref pattern) = event_pattern {
2116 validate_event_pattern(pattern)?;
2117 }
2118
2119 let mut state = self.state.write();
2120
2121 let bus_name = state.resolve_bus_name(&event_source_arn);
2123 if !state.buses.contains_key(&bus_name) {
2124 return Err(AwsServiceError::aws_error(
2125 StatusCode::BAD_REQUEST,
2126 "ResourceNotFoundException",
2127 format!("Event bus {bus_name} does not exist."),
2128 ));
2129 }
2130
2131 if state.archives.contains_key(&name) {
2133 return Err(AwsServiceError::aws_error(
2134 StatusCode::BAD_REQUEST,
2135 "ResourceAlreadyExistsException",
2136 format!("Archive {name} already exists."),
2137 ));
2138 }
2139
2140 let now = Utc::now();
2141 let arn = format!(
2142 "arn:aws:events:{}:{}:archive/{}",
2143 req.region, state.account_id, name
2144 );
2145
2146 let archive = Archive {
2147 name: name.clone(),
2148 arn: arn.clone(),
2149 event_source_arn: event_source_arn.clone(),
2150 description,
2151 event_pattern: event_pattern.clone(),
2152 retention_days,
2153 state: "ENABLED".to_string(),
2154 creation_time: now,
2155 event_count: 0,
2156 size_bytes: 0,
2157 events: Vec::new(),
2158 };
2159 state.archives.insert(name.clone(), archive);
2160
2161 let rule_name = format!("Events-Archive-{name}");
2163 let rule_arn = format!(
2164 "arn:aws:events:{}:{}:rule/{}",
2165 req.region, state.account_id, rule_name
2166 );
2167 let rule_event_pattern = {
2169 let mut merged = if let Some(ref ep) = event_pattern {
2170 serde_json::from_str::<Value>(ep).unwrap_or_else(|_| json!({}))
2171 } else {
2172 json!({})
2173 };
2174 if let Some(obj) = merged.as_object_mut() {
2175 obj.insert("replay-name".to_string(), json!([{"exists": false}]));
2176 }
2177 serde_json::to_string(&merged).unwrap_or_default()
2178 };
2179
2180 let archive_target = EventTarget {
2182 id: name.clone(),
2183 arn: format!("arn:aws:events:{}:::", req.region),
2184 input: None,
2185 input_path: None,
2186 input_transformer: Some(json!({
2187 "InputPathsMap": {},
2188 "InputTemplate": format!(
2189 "{{\"archive-arn\": \"{}\", \"event\": <aws.events.event.json>, \"ingestion-time\": <aws.events.event.ingestion-time>}}",
2190 arn
2191 )
2192 })),
2193 sqs_parameters: None,
2194 };
2195
2196 let archive_rule = EventRule {
2197 name: rule_name.clone(),
2198 arn: rule_arn,
2199 event_bus_name: bus_name.clone(),
2200 event_pattern: Some(rule_event_pattern),
2201 schedule_expression: None,
2202 state: "ENABLED".to_string(),
2203 description: None,
2204 role_arn: None,
2205 managed_by: Some("prod.vhs.events.aws.internal".to_string()),
2206 created_by: Some(state.account_id.clone()),
2207 targets: vec![archive_target],
2208 tags: HashMap::new(),
2209 last_fired: None,
2210 };
2211 let key = (bus_name, rule_name);
2212 state.rules.insert(key, archive_rule);
2213
2214 Ok(json_resp(json!({
2215 "ArchiveArn": arn,
2216 "CreationTime": now.timestamp() as f64,
2217 "State": "ENABLED",
2218 })))
2219 }
2220
2221 fn describe_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2222 let body = parse_body(req);
2223 validate_required("ArchiveName", &body["ArchiveName"])?;
2224 let name = body["ArchiveName"]
2225 .as_str()
2226 .ok_or_else(|| missing("ArchiveName"))?;
2227 validate_string_length("archiveName", name, 1, 48)?;
2228
2229 let state = self.state.read();
2230 let archive = state.archives.get(name).ok_or_else(|| {
2231 AwsServiceError::aws_error(
2232 StatusCode::BAD_REQUEST,
2233 "ResourceNotFoundException",
2234 format!("Archive {name} does not exist."),
2235 )
2236 })?;
2237
2238 let mut resp = json!({
2239 "ArchiveArn": archive.arn,
2240 "ArchiveName": archive.name,
2241 "CreationTime": archive.creation_time.timestamp() as f64,
2242 "EventCount": archive.event_count,
2243 "EventSourceArn": archive.event_source_arn,
2244 "RetentionDays": archive.retention_days,
2245 "SizeBytes": archive.size_bytes,
2246 "State": archive.state,
2247 });
2248 if let Some(ref desc) = archive.description {
2249 resp["Description"] = json!(desc);
2250 }
2251 if let Some(ref ep) = archive.event_pattern {
2252 resp["EventPattern"] = json!(ep);
2253 }
2254
2255 Ok(json_resp(resp))
2256 }
2257
2258 fn list_archives(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2259 let body = parse_body(req);
2260 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 48)?;
2261 validate_optional_string_length(
2262 "eventSourceArn",
2263 body["EventSourceArn"].as_str(),
2264 1,
2265 1600,
2266 )?;
2267 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2268 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2269 let name_prefix = body["NamePrefix"].as_str();
2270 let source_arn = body["EventSourceArn"].as_str();
2271 let archive_state = body["State"].as_str();
2272
2273 let filter_count = [
2275 name_prefix.is_some(),
2276 source_arn.is_some(),
2277 archive_state.is_some(),
2278 ]
2279 .iter()
2280 .filter(|&&x| x)
2281 .count();
2282 if filter_count > 1 {
2283 return Err(AwsServiceError::aws_error(
2284 StatusCode::BAD_REQUEST,
2285 "ValidationException",
2286 "At most one filter is allowed for ListArchives. Use either : State, EventSourceArn, or NamePrefix.",
2287 ));
2288 }
2289
2290 if let Some(s) = archive_state {
2292 let valid = [
2293 "ENABLED",
2294 "DISABLED",
2295 "CREATING",
2296 "UPDATING",
2297 "CREATE_FAILED",
2298 "UPDATE_FAILED",
2299 ];
2300 if !valid.contains(&s) {
2301 return Err(AwsServiceError::aws_error(
2302 StatusCode::BAD_REQUEST,
2303 "ValidationException",
2304 format!(
2305 "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]",
2306 s
2307 ),
2308 ));
2309 }
2310 }
2311
2312 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2313 let offset: usize = body["NextToken"]
2314 .as_str()
2315 .and_then(|t| t.parse().ok())
2316 .unwrap_or(0);
2317
2318 let state = self.state.read();
2319 let filtered: Vec<Value> = state
2320 .archives
2321 .values()
2322 .filter(|a| {
2323 if let Some(prefix) = name_prefix {
2324 a.name.starts_with(prefix)
2325 } else if let Some(arn) = source_arn {
2326 a.event_source_arn == arn
2327 } else if let Some(s) = archive_state {
2328 a.state == s
2329 } else {
2330 true
2331 }
2332 })
2333 .skip(offset)
2334 .take(limit + 1)
2335 .map(|a| {
2336 json!({
2337 "ArchiveName": a.name,
2338 "CreationTime": a.creation_time.timestamp() as f64,
2339 "EventCount": a.event_count,
2340 "EventSourceArn": a.event_source_arn,
2341 "RetentionDays": a.retention_days,
2342 "SizeBytes": a.size_bytes,
2343 "State": a.state,
2344 })
2345 })
2346 .collect();
2347
2348 let has_more = filtered.len() > limit;
2349 let archives: Vec<Value> = filtered.into_iter().take(limit).collect();
2350 let mut resp = json!({ "Archives": archives });
2351 if has_more {
2352 resp["NextToken"] = json!((offset + limit).to_string());
2353 }
2354
2355 Ok(json_resp(resp))
2356 }
2357
2358 fn update_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2359 let body = parse_body(req);
2360 validate_required("ArchiveName", &body["ArchiveName"])?;
2361 let name = body["ArchiveName"]
2362 .as_str()
2363 .ok_or_else(|| missing("ArchiveName"))?;
2364 validate_string_length("archiveName", name, 1, 48)?;
2365 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2366 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2367 if let Some(rd) = body["RetentionDays"].as_i64() {
2368 validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2369 }
2370
2371 if let Some(pattern) = body["EventPattern"].as_str() {
2373 validate_event_pattern(pattern)?;
2374 }
2375
2376 let mut state = self.state.write();
2377 let archive = state.archives.get_mut(name).ok_or_else(|| {
2378 AwsServiceError::aws_error(
2379 StatusCode::BAD_REQUEST,
2380 "ResourceNotFoundException",
2381 format!("Archive {name} does not exist."),
2382 )
2383 })?;
2384
2385 if let Some(desc) = body["Description"].as_str() {
2386 archive.description = Some(desc.to_string());
2387 }
2388 if let Some(pattern) = body["EventPattern"].as_str() {
2389 archive.event_pattern = Some(pattern.to_string());
2390 }
2391 if let Some(days) = body["RetentionDays"].as_i64() {
2392 archive.retention_days = days;
2393 }
2394
2395 Ok(json_resp(json!({
2396 "ArchiveArn": archive.arn,
2397 "CreationTime": archive.creation_time.timestamp() as f64,
2398 "State": archive.state,
2399 })))
2400 }
2401
2402 fn delete_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2403 let body = parse_body(req);
2404 validate_required("ArchiveName", &body["ArchiveName"])?;
2405 let name = body["ArchiveName"]
2406 .as_str()
2407 .ok_or_else(|| missing("ArchiveName"))?;
2408 validate_string_length("archiveName", name, 1, 48)?;
2409
2410 let mut state = self.state.write();
2411 if !state.archives.contains_key(name) {
2412 return Err(AwsServiceError::aws_error(
2413 StatusCode::BAD_REQUEST,
2414 "ResourceNotFoundException",
2415 format!("Archive {name} does not exist."),
2416 ));
2417 }
2418
2419 state.archives.remove(name);
2420
2421 let rule_name = format!("Events-Archive-{name}");
2423 state.rules.retain(|k, _| k.1 != rule_name);
2424
2425 Ok(json_resp(json!({})))
2426 }
2427
2428 fn create_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2431 let body = parse_body(req);
2432 validate_required("Name", &body["Name"])?;
2433 let name = body["Name"]
2434 .as_str()
2435 .ok_or_else(|| missing("Name"))?
2436 .to_string();
2437 validate_string_length("name", &name, 1, 64)?;
2438 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2439 validate_required("AuthorizationType", &body["AuthorizationType"])?;
2440 let description = body["Description"].as_str().map(|s| s.to_string());
2441 let auth_type = body["AuthorizationType"]
2442 .as_str()
2443 .ok_or_else(|| missing("AuthorizationType"))?
2444 .to_string();
2445 validate_enum(
2446 "authorizationType",
2447 &auth_type,
2448 &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2449 )?;
2450 validate_optional_string_length(
2451 "kmsKeyIdentifier",
2452 body["KmsKeyIdentifier"].as_str(),
2453 0,
2454 2048,
2455 )?;
2456 validate_required("AuthParameters", &body["AuthParameters"])?;
2457 let auth_params = body["AuthParameters"].clone();
2458
2459 let mut state = self.state.write();
2460 let now = Utc::now();
2461 let conn_uuid = uuid::Uuid::new_v4();
2462 let arn = format!(
2463 "arn:aws:events:{}:{}:connection/{}/{}",
2464 req.region, state.account_id, name, conn_uuid
2465 );
2466 let secret_arn = format!(
2467 "arn:aws:secretsmanager:{}:{}:secret:events!connection/{}/{}",
2468 req.region, state.account_id, name, conn_uuid
2469 );
2470
2471 let conn = Connection {
2472 name: name.clone(),
2473 arn: arn.clone(),
2474 description,
2475 authorization_type: auth_type.clone(),
2476 auth_parameters: auth_params,
2477 connection_state: "AUTHORIZED".to_string(),
2478 secret_arn: secret_arn.clone(),
2479 creation_time: now,
2480 last_modified_time: now,
2481 last_authorized_time: now,
2482 };
2483 state.connections.insert(name, conn);
2484
2485 Ok(json_resp(json!({
2486 "ConnectionArn": arn,
2487 "ConnectionState": "AUTHORIZED",
2488 "CreationTime": now.timestamp() as f64,
2489 "LastModifiedTime": now.timestamp() as f64,
2490 })))
2491 }
2492
2493 fn describe_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2494 let body = parse_body(req);
2495 validate_required("Name", &body["Name"])?;
2496 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2497 validate_string_length("name", name, 1, 64)?;
2498
2499 let state = self.state.read();
2500 let conn = state.connections.get(name).ok_or_else(|| {
2501 AwsServiceError::aws_error(
2502 StatusCode::BAD_REQUEST,
2503 "ResourceNotFoundException",
2504 format!("Connection '{name}' does not exist."),
2505 )
2506 })?;
2507
2508 let auth_params_response =
2510 build_auth_params_response(&conn.authorization_type, &conn.auth_parameters);
2511
2512 let mut resp = json!({
2513 "ConnectionArn": conn.arn,
2514 "Name": conn.name,
2515 "AuthorizationType": conn.authorization_type,
2516 "AuthParameters": auth_params_response,
2517 "ConnectionState": conn.connection_state,
2518 "SecretArn": conn.secret_arn,
2519 "CreationTime": conn.creation_time.timestamp() as f64,
2520 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2521 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2522 });
2523 if let Some(ref desc) = conn.description {
2524 resp["Description"] = json!(desc);
2525 }
2526
2527 Ok(json_resp(resp))
2528 }
2529
2530 fn list_connections(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2531 let body = parse_body(req);
2532 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2533 validate_optional_enum(
2534 "connectionState",
2535 body["ConnectionState"].as_str(),
2536 &[
2537 "CREATING",
2538 "UPDATING",
2539 "DELETING",
2540 "AUTHORIZED",
2541 "DEAUTHORIZED",
2542 "AUTHORIZING",
2543 "DEAUTHORIZING",
2544 "ACTIVE",
2545 "FAILED_CONNECTIVITY",
2546 ],
2547 )?;
2548 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2549 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2550
2551 let name_prefix = body["NamePrefix"].as_str();
2552 let connection_state = body["ConnectionState"].as_str();
2553 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2554 let offset: usize = body["NextToken"]
2555 .as_str()
2556 .and_then(|t| t.parse().ok())
2557 .unwrap_or(0);
2558
2559 let state = self.state.read();
2560 let filtered: Vec<Value> = state
2561 .connections
2562 .values()
2563 .filter(|c| {
2564 if let Some(prefix) = name_prefix {
2565 if !c.name.starts_with(prefix) {
2566 return false;
2567 }
2568 }
2569 if let Some(cs) = connection_state {
2570 if c.connection_state != cs {
2571 return false;
2572 }
2573 }
2574 true
2575 })
2576 .skip(offset)
2577 .take(limit + 1)
2578 .map(|c| {
2579 json!({
2580 "ConnectionArn": c.arn,
2581 "Name": c.name,
2582 "AuthorizationType": c.authorization_type,
2583 "ConnectionState": c.connection_state,
2584 "CreationTime": c.creation_time.timestamp() as f64,
2585 "LastModifiedTime": c.last_modified_time.timestamp() as f64,
2586 "LastAuthorizedTime": c.last_authorized_time.timestamp() as f64,
2587 })
2588 })
2589 .collect();
2590
2591 let has_more = filtered.len() > limit;
2592 let conns: Vec<Value> = filtered.into_iter().take(limit).collect();
2593 let mut resp = json!({ "Connections": conns });
2594 if has_more {
2595 resp["NextToken"] = json!((offset + limit).to_string());
2596 }
2597
2598 Ok(json_resp(resp))
2599 }
2600
2601 fn update_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2602 let body = parse_body(req);
2603 validate_required("Name", &body["Name"])?;
2604 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2605 validate_string_length("name", name, 1, 64)?;
2606 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2607 validate_optional_enum(
2608 "authorizationType",
2609 body["AuthorizationType"].as_str(),
2610 &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2611 )?;
2612
2613 let mut state = self.state.write();
2614 let conn = state.connections.get_mut(name).ok_or_else(|| {
2615 AwsServiceError::aws_error(
2616 StatusCode::BAD_REQUEST,
2617 "ResourceNotFoundException",
2618 format!("Connection '{name}' does not exist."),
2619 )
2620 })?;
2621
2622 if let Some(desc) = body["Description"].as_str() {
2623 conn.description = Some(desc.to_string());
2624 }
2625 if let Some(auth_type) = body["AuthorizationType"].as_str() {
2626 conn.authorization_type = auth_type.to_string();
2627 }
2628 if body.get("AuthParameters").is_some() {
2629 conn.auth_parameters = body["AuthParameters"].clone();
2630 }
2631 conn.last_modified_time = Utc::now();
2632
2633 Ok(json_resp(json!({
2634 "ConnectionArn": conn.arn,
2635 "ConnectionState": conn.connection_state,
2636 "CreationTime": conn.creation_time.timestamp() as f64,
2637 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2638 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2639 })))
2640 }
2641
2642 fn delete_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2643 let body = parse_body(req);
2644 validate_required("Name", &body["Name"])?;
2645 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2646 validate_string_length("name", name, 1, 64)?;
2647
2648 let mut state = self.state.write();
2649 let conn = state.connections.remove(name).ok_or_else(|| {
2650 AwsServiceError::aws_error(
2651 StatusCode::BAD_REQUEST,
2652 "ResourceNotFoundException",
2653 format!("Connection '{name}' does not exist."),
2654 )
2655 })?;
2656
2657 Ok(json_resp(json!({
2658 "ConnectionArn": conn.arn,
2659 "ConnectionState": conn.connection_state,
2660 "CreationTime": conn.creation_time.timestamp() as f64,
2661 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2662 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2663 })))
2664 }
2665
2666 fn create_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2669 let body = parse_body(req);
2670 validate_required("Name", &body["Name"])?;
2671 let name = body["Name"]
2672 .as_str()
2673 .ok_or_else(|| missing("Name"))?
2674 .to_string();
2675 validate_string_length("name", &name, 1, 64)?;
2676 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2677 validate_required("ConnectionArn", &body["ConnectionArn"])?;
2678 let description = body["Description"].as_str().map(|s| s.to_string());
2679 let connection_arn = body["ConnectionArn"]
2680 .as_str()
2681 .ok_or_else(|| missing("ConnectionArn"))?
2682 .to_string();
2683 validate_string_length("connectionArn", &connection_arn, 1, 1600)?;
2684 validate_required("InvocationEndpoint", &body["InvocationEndpoint"])?;
2685 let endpoint = body["InvocationEndpoint"]
2686 .as_str()
2687 .ok_or_else(|| missing("InvocationEndpoint"))?
2688 .to_string();
2689 validate_string_length("invocationEndpoint", &endpoint, 1, 2048)?;
2690 validate_required("HttpMethod", &body["HttpMethod"])?;
2691 let http_method = body["HttpMethod"]
2692 .as_str()
2693 .ok_or_else(|| missing("HttpMethod"))?
2694 .to_string();
2695 validate_enum(
2696 "httpMethod",
2697 &http_method,
2698 &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2699 )?;
2700 let rate_limit = body["InvocationRateLimitPerSecond"].as_i64();
2701 if let Some(r) = rate_limit {
2702 validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2703 }
2704
2705 let mut state = self.state.write();
2706 let now = Utc::now();
2707 let dest_uuid = uuid::Uuid::new_v4();
2708 let arn = format!(
2709 "arn:aws:events:{}:{}:api-destination/{}/{}",
2710 req.region, state.account_id, name, dest_uuid
2711 );
2712
2713 let dest = ApiDestination {
2714 name: name.clone(),
2715 arn: arn.clone(),
2716 description,
2717 connection_arn,
2718 invocation_endpoint: endpoint,
2719 http_method,
2720 invocation_rate_limit_per_second: rate_limit,
2721 state: "ACTIVE".to_string(),
2722 creation_time: now,
2723 last_modified_time: now,
2724 };
2725 state.api_destinations.insert(name, dest);
2726
2727 Ok(json_resp(json!({
2728 "ApiDestinationArn": arn,
2729 "ApiDestinationState": "ACTIVE",
2730 "CreationTime": now.timestamp() as f64,
2731 "LastModifiedTime": now.timestamp() as f64,
2732 })))
2733 }
2734
2735 fn describe_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2736 let body = parse_body(req);
2737 validate_required("Name", &body["Name"])?;
2738 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2739 validate_string_length("name", name, 1, 64)?;
2740
2741 let state = self.state.read();
2742 let dest = state.api_destinations.get(name).ok_or_else(|| {
2743 AwsServiceError::aws_error(
2744 StatusCode::BAD_REQUEST,
2745 "ResourceNotFoundException",
2746 format!("An api-destination '{name}' does not exist."),
2747 )
2748 })?;
2749
2750 let mut resp = json!({
2751 "ApiDestinationArn": dest.arn,
2752 "Name": dest.name,
2753 "ConnectionArn": dest.connection_arn,
2754 "InvocationEndpoint": dest.invocation_endpoint,
2755 "HttpMethod": dest.http_method,
2756 "ApiDestinationState": dest.state,
2757 "CreationTime": dest.creation_time.timestamp() as f64,
2758 "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2759 });
2760 if let Some(ref desc) = dest.description {
2761 resp["Description"] = json!(desc);
2762 }
2763 if let Some(rate) = dest.invocation_rate_limit_per_second {
2764 resp["InvocationRateLimitPerSecond"] = json!(rate);
2765 }
2766
2767 Ok(json_resp(resp))
2768 }
2769
2770 fn list_api_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2771 let body = parse_body(req);
2772 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2773 validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2774 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2775 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2776
2777 let name_prefix = body["NamePrefix"].as_str();
2778 let connection_arn = body["ConnectionArn"].as_str();
2779 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2780 let offset: usize = body["NextToken"]
2781 .as_str()
2782 .and_then(|t| t.parse().ok())
2783 .unwrap_or(0);
2784
2785 let state = self.state.read();
2786 let filtered: Vec<Value> = state
2787 .api_destinations
2788 .values()
2789 .filter(|d| {
2790 if let Some(prefix) = name_prefix {
2791 if !d.name.starts_with(prefix) {
2792 return false;
2793 }
2794 }
2795 if let Some(arn) = connection_arn {
2796 if d.connection_arn != arn {
2797 return false;
2798 }
2799 }
2800 true
2801 })
2802 .skip(offset)
2803 .take(limit + 1)
2804 .map(|d| {
2805 let mut obj = json!({
2806 "ApiDestinationArn": d.arn,
2807 "Name": d.name,
2808 "ConnectionArn": d.connection_arn,
2809 "InvocationEndpoint": d.invocation_endpoint,
2810 "HttpMethod": d.http_method,
2811 "ApiDestinationState": d.state,
2812 "CreationTime": d.creation_time.timestamp() as f64,
2813 "LastModifiedTime": d.last_modified_time.timestamp() as f64,
2814 });
2815 if let Some(rate) = d.invocation_rate_limit_per_second {
2816 obj["InvocationRateLimitPerSecond"] = json!(rate);
2817 }
2818 obj
2819 })
2820 .collect();
2821
2822 let has_more = filtered.len() > limit;
2823 let dests: Vec<Value> = filtered.into_iter().take(limit).collect();
2824 let mut resp = json!({ "ApiDestinations": dests });
2825 if has_more {
2826 resp["NextToken"] = json!((offset + limit).to_string());
2827 }
2828
2829 Ok(json_resp(resp))
2830 }
2831
2832 fn update_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2833 let body = parse_body(req);
2834 validate_required("Name", &body["Name"])?;
2835 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2836 validate_string_length("name", name, 1, 64)?;
2837 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2838 validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2839 validate_optional_string_length(
2840 "invocationEndpoint",
2841 body["InvocationEndpoint"].as_str(),
2842 1,
2843 2048,
2844 )?;
2845 validate_optional_enum(
2846 "httpMethod",
2847 body["HttpMethod"].as_str(),
2848 &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2849 )?;
2850 if let Some(r) = body["InvocationRateLimitPerSecond"].as_i64() {
2851 validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2852 }
2853
2854 let mut state = self.state.write();
2855 let dest = state.api_destinations.get_mut(name).ok_or_else(|| {
2856 AwsServiceError::aws_error(
2857 StatusCode::BAD_REQUEST,
2858 "ResourceNotFoundException",
2859 format!("An api-destination '{name}' does not exist."),
2860 )
2861 })?;
2862
2863 if let Some(desc) = body["Description"].as_str() {
2864 dest.description = Some(desc.to_string());
2865 }
2866 if let Some(endpoint) = body["InvocationEndpoint"].as_str() {
2867 dest.invocation_endpoint = endpoint.to_string();
2868 }
2869 if let Some(method) = body["HttpMethod"].as_str() {
2870 dest.http_method = method.to_string();
2871 }
2872 if let Some(rate) = body["InvocationRateLimitPerSecond"].as_i64() {
2873 dest.invocation_rate_limit_per_second = Some(rate);
2874 }
2875 if let Some(conn) = body["ConnectionArn"].as_str() {
2876 dest.connection_arn = conn.to_string();
2877 }
2878 dest.last_modified_time = Utc::now();
2879
2880 Ok(json_resp(json!({
2881 "ApiDestinationArn": dest.arn,
2882 "ApiDestinationState": dest.state,
2883 "CreationTime": dest.creation_time.timestamp() as f64,
2884 "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2885 })))
2886 }
2887
2888 fn delete_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2889 let body = parse_body(req);
2890 validate_required("Name", &body["Name"])?;
2891 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2892 validate_string_length("name", name, 1, 64)?;
2893
2894 let mut state = self.state.write();
2895 if !state.api_destinations.contains_key(name) {
2896 return Err(AwsServiceError::aws_error(
2897 StatusCode::BAD_REQUEST,
2898 "ResourceNotFoundException",
2899 format!("An api-destination '{name}' does not exist."),
2900 ));
2901 }
2902 state.api_destinations.remove(name);
2903
2904 Ok(json_resp(json!({})))
2905 }
2906
2907 fn start_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2910 let body = parse_body(req);
2911 validate_required("ReplayName", &body["ReplayName"])?;
2912 let name = body["ReplayName"]
2913 .as_str()
2914 .ok_or_else(|| missing("ReplayName"))?
2915 .to_string();
2916 validate_string_length("replayName", &name, 1, 64)?;
2917 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2918 validate_required("EventSourceArn", &body["EventSourceArn"])?;
2919 let description = body["Description"].as_str().map(|s| s.to_string());
2920 let event_source_arn = body["EventSourceArn"]
2921 .as_str()
2922 .ok_or_else(|| missing("EventSourceArn"))?
2923 .to_string();
2924 validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2925 validate_required("EventStartTime", &body["EventStartTime"])?;
2926 validate_required("EventEndTime", &body["EventEndTime"])?;
2927 validate_required("Destination", &body["Destination"])?;
2928 let destination = body["Destination"].clone();
2929 let event_start_time_f = body["EventStartTime"].as_f64();
2930 let event_end_time_f = body["EventEndTime"].as_f64();
2931
2932 let event_start_time = event_start_time_f
2933 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
2934 .unwrap_or_else(Utc::now);
2935 let event_end_time = event_end_time_f
2936 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
2937 .unwrap_or_else(Utc::now);
2938
2939 let dest_arn = destination["Arn"].as_str().unwrap_or("");
2941 if !dest_arn.contains(":event-bus/") {
2942 return Err(AwsServiceError::aws_error(
2943 StatusCode::BAD_REQUEST,
2944 "ValidationException",
2945 "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
2946 ));
2947 }
2948
2949 let mut state = self.state.write();
2950
2951 let bus_name = state.resolve_bus_name(dest_arn);
2953 if !state.buses.contains_key(&bus_name) {
2954 return Err(AwsServiceError::aws_error(
2955 StatusCode::BAD_REQUEST,
2956 "ResourceNotFoundException",
2957 format!("Event bus {bus_name} does not exist."),
2958 ));
2959 }
2960
2961 let archive_name = event_source_arn
2963 .rsplit_once("archive/")
2964 .map(|(_, n)| n.to_string())
2965 .unwrap_or_default();
2966 if !state.archives.contains_key(&archive_name) {
2967 return Err(AwsServiceError::aws_error(
2968 StatusCode::BAD_REQUEST,
2969 "ValidationException",
2970 format!(
2971 "Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
2972 ),
2973 ));
2974 }
2975
2976 let archive = state.archives.get(&archive_name).unwrap();
2978 let archive_bus = state.resolve_bus_name(&archive.event_source_arn);
2979 if archive_bus != bus_name {
2980 return Err(AwsServiceError::aws_error(
2981 StatusCode::BAD_REQUEST,
2982 "ValidationException",
2983 "Parameter Destination.Arn is not valid. Reason: Cross event bus replay is not permitted.",
2984 ));
2985 }
2986
2987 if event_end_time <= event_start_time {
2989 return Err(AwsServiceError::aws_error(
2990 StatusCode::BAD_REQUEST,
2991 "ValidationException",
2992 "Parameter EventEndTime is not valid. Reason: EventStartTime must be before EventEndTime.",
2993 ));
2994 }
2995
2996 if state.replays.contains_key(&name) {
2998 return Err(AwsServiceError::aws_error(
2999 StatusCode::BAD_REQUEST,
3000 "ResourceAlreadyExistsException",
3001 format!("Replay {name} already exists."),
3002 ));
3003 }
3004
3005 let now = Utc::now();
3006 let arn = format!(
3007 "arn:aws:events:{}:{}:replay/{}",
3008 req.region, state.account_id, name
3009 );
3010
3011 let replay = Replay {
3012 name: name.clone(),
3013 arn: arn.clone(),
3014 description,
3015 event_source_arn,
3016 destination,
3017 event_start_time,
3018 event_end_time,
3019 state: "COMPLETED".to_string(), replay_start_time: now,
3021 replay_end_time: Some(now),
3022 };
3023 state.replays.insert(name, replay);
3024
3025 Ok(json_resp(json!({
3026 "ReplayArn": arn,
3027 "ReplayStartTime": now.timestamp() as f64,
3028 "State": "STARTING",
3029 })))
3030 }
3031
3032 fn describe_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3033 let body = parse_body(req);
3034 validate_required("ReplayName", &body["ReplayName"])?;
3035 let name = body["ReplayName"]
3036 .as_str()
3037 .ok_or_else(|| missing("ReplayName"))?;
3038 validate_string_length("replayName", name, 1, 64)?;
3039
3040 let state = self.state.read();
3041 let replay = state.replays.get(name).ok_or_else(|| {
3042 AwsServiceError::aws_error(
3043 StatusCode::BAD_REQUEST,
3044 "ResourceNotFoundException",
3045 format!("Replay {name} does not exist."),
3046 )
3047 })?;
3048
3049 let mut resp = json!({
3050 "Destination": replay.destination,
3051 "EventSourceArn": replay.event_source_arn,
3052 "EventStartTime": replay.event_start_time.timestamp() as f64,
3053 "EventEndTime": replay.event_end_time.timestamp() as f64,
3054 "ReplayArn": replay.arn,
3055 "ReplayName": replay.name,
3056 "ReplayStartTime": replay.replay_start_time.timestamp() as f64,
3057 "State": replay.state,
3058 });
3059 if let Some(ref desc) = replay.description {
3060 resp["Description"] = json!(desc);
3061 }
3062 if let Some(ref end) = replay.replay_end_time {
3063 resp["ReplayEndTime"] = json!(end.timestamp() as f64);
3064 }
3065
3066 Ok(json_resp(resp))
3067 }
3068
3069 fn list_replays(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3070 let body = parse_body(req);
3071 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
3072 validate_optional_string_length(
3073 "eventSourceArn",
3074 body["EventSourceArn"].as_str(),
3075 1,
3076 1600,
3077 )?;
3078 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
3079 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
3080 let name_prefix = body["NamePrefix"].as_str();
3081 let source_arn = body["EventSourceArn"].as_str();
3082 let replay_state = body["State"].as_str();
3083
3084 let filter_count = [
3086 name_prefix.is_some(),
3087 source_arn.is_some(),
3088 replay_state.is_some(),
3089 ]
3090 .iter()
3091 .filter(|&&x| x)
3092 .count();
3093 if filter_count > 1 {
3094 return Err(AwsServiceError::aws_error(
3095 StatusCode::BAD_REQUEST,
3096 "ValidationException",
3097 "At most one filter is allowed for ListReplays. Use either : State, EventSourceArn, or NamePrefix.",
3098 ));
3099 }
3100
3101 if let Some(s) = replay_state {
3103 let valid = [
3104 "CANCELLED",
3105 "CANCELLING",
3106 "COMPLETED",
3107 "FAILED",
3108 "RUNNING",
3109 "STARTING",
3110 ];
3111 if !valid.contains(&s) {
3112 return Err(AwsServiceError::aws_error(
3113 StatusCode::BAD_REQUEST,
3114 "ValidationException",
3115 format!(
3116 "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]",
3117 s
3118 ),
3119 ));
3120 }
3121 }
3122
3123 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
3124 let offset: usize = body["NextToken"]
3125 .as_str()
3126 .and_then(|t| t.parse().ok())
3127 .unwrap_or(0);
3128
3129 let state = self.state.read();
3130 let filtered: Vec<Value> = state
3131 .replays
3132 .values()
3133 .filter(|r| {
3134 if let Some(prefix) = name_prefix {
3135 r.name.starts_with(prefix)
3136 } else if let Some(arn) = source_arn {
3137 r.event_source_arn == arn
3138 } else if let Some(s) = replay_state {
3139 r.state == s
3140 } else {
3141 true
3142 }
3143 })
3144 .skip(offset)
3145 .take(limit + 1)
3146 .map(|r| {
3147 let mut obj = json!({
3148 "EventSourceArn": r.event_source_arn,
3149 "EventStartTime": r.event_start_time.timestamp() as f64,
3150 "EventEndTime": r.event_end_time.timestamp() as f64,
3151 "ReplayName": r.name,
3152 "ReplayStartTime": r.replay_start_time.timestamp() as f64,
3153 "State": r.state,
3154 });
3155 if let Some(ref end) = r.replay_end_time {
3156 obj["ReplayEndTime"] = json!(end.timestamp() as f64);
3157 }
3158 obj
3159 })
3160 .collect();
3161
3162 let has_more = filtered.len() > limit;
3163 let replays: Vec<Value> = filtered.into_iter().take(limit).collect();
3164 let mut resp = json!({ "Replays": replays });
3165 if has_more {
3166 resp["NextToken"] = json!((offset + limit).to_string());
3167 }
3168
3169 Ok(json_resp(resp))
3170 }
3171
3172 fn cancel_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3173 let body = parse_body(req);
3174 validate_required("ReplayName", &body["ReplayName"])?;
3175 let name = body["ReplayName"]
3176 .as_str()
3177 .ok_or_else(|| missing("ReplayName"))?;
3178 validate_string_length("replayName", name, 1, 64)?;
3179
3180 let mut state = self.state.write();
3181 let replay = state.replays.get_mut(name).ok_or_else(|| {
3182 AwsServiceError::aws_error(
3183 StatusCode::BAD_REQUEST,
3184 "ResourceNotFoundException",
3185 format!("Replay {name} does not exist."),
3186 )
3187 })?;
3188
3189 if replay.state == "CANCELLED" || replay.state == "CANCELLING" {
3191 return Err(AwsServiceError::aws_error(
3192 StatusCode::BAD_REQUEST,
3193 "IllegalStatusException",
3194 format!("Replay {name} is not in a valid state for this operation."),
3195 ));
3196 }
3197
3198 let arn = replay.arn.clone();
3199 replay.state = "CANCELLED".to_string();
3200
3201 Ok(json_resp(json!({
3202 "ReplayArn": arn,
3203 "State": "CANCELLING",
3204 })))
3205 }
3206}
3207
3208fn find_tags_mut<'a>(
3211 state: &'a mut crate::state::EventBridgeState,
3212 arn: &str,
3213) -> Result<&'a mut HashMap<String, String>, AwsServiceError> {
3214 for bus in state.buses.values_mut() {
3216 if bus.arn == arn {
3217 return Ok(&mut bus.tags);
3218 }
3219 }
3220 for rule in state.rules.values_mut() {
3222 if rule.arn == arn {
3223 return Ok(&mut rule.tags);
3224 }
3225 }
3226
3227 let error_msg = if arn.contains(":rule/") {
3229 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3231 if let Some(rule_path) = parts.first() {
3232 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3233 format!("Rule {rule_name} does not exist on EventBus {bus}.")
3234 } else {
3235 format!("Rule {} does not exist on EventBus default.", rule_path)
3236 }
3237 } else {
3238 format!("Resource {arn} not found.")
3239 }
3240 } else {
3241 format!("Resource {arn} not found.")
3242 };
3243
3244 Err(AwsServiceError::aws_error(
3245 StatusCode::BAD_REQUEST,
3246 "ResourceNotFoundException",
3247 error_msg,
3248 ))
3249}
3250
3251fn find_tags<'a>(
3252 state: &'a crate::state::EventBridgeState,
3253 arn: &str,
3254) -> Result<&'a HashMap<String, String>, AwsServiceError> {
3255 for bus in state.buses.values() {
3256 if bus.arn == arn {
3257 return Ok(&bus.tags);
3258 }
3259 }
3260 for rule in state.rules.values() {
3261 if rule.arn == arn {
3262 return Ok(&rule.tags);
3263 }
3264 }
3265
3266 let error_msg = if arn.contains(":rule/") {
3267 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3268 if let Some(rule_path) = parts.first() {
3269 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3270 format!("Rule {rule_name} does not exist on EventBus {bus}.")
3271 } else {
3272 format!("Rule {} does not exist on EventBus default.", rule_path)
3273 }
3274 } else {
3275 format!("Resource {arn} not found.")
3276 }
3277 } else {
3278 format!("Resource {arn} not found.")
3279 };
3280
3281 Err(AwsServiceError::aws_error(
3282 StatusCode::BAD_REQUEST,
3283 "ResourceNotFoundException",
3284 error_msg,
3285 ))
3286}
3287
3288fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
3291 let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
3292 AwsServiceError::aws_error(
3293 StatusCode::BAD_REQUEST,
3294 "InvalidEventPatternException",
3295 "Event pattern is not valid. Reason: Invalid JSON",
3296 )
3297 })?;
3298
3299 validate_pattern_values(&parsed, "")?;
3300 Ok(())
3301}
3302
3303fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
3304 match value {
3305 Value::Object(obj) => {
3306 for (key, val) in obj {
3307 let new_path = if path.is_empty() {
3308 key.clone()
3309 } else {
3310 format!("{path}.{key}")
3311 };
3312 match val {
3313 Value::Object(_) => validate_pattern_values(val, &new_path)?,
3314 Value::Array(_) => {} _ => {
3316 return Err(AwsServiceError::aws_error(
3317 StatusCode::BAD_REQUEST,
3318 "InvalidEventPatternException",
3319 format!(
3320 "Event pattern is not valid. Reason: '{}' must be an object or an array",
3321 key
3322 ),
3323 ));
3324 }
3325 }
3326 }
3327 Ok(())
3328 }
3329 _ => Ok(()),
3330 }
3331}
3332
3333fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
3336 match auth_type {
3337 "API_KEY" => {
3338 let mut resp = json!({});
3339 if let Some(api_key) = params.get("ApiKeyAuthParameters") {
3340 resp["ApiKeyAuthParameters"] = json!({
3341 "ApiKeyName": api_key["ApiKeyName"],
3342 });
3343 }
3344 resp
3345 }
3346 "BASIC" => {
3347 let mut resp = json!({});
3348 if let Some(basic) = params.get("BasicAuthParameters") {
3349 resp["BasicAuthParameters"] = json!({
3350 "Username": basic["Username"],
3351 });
3352 }
3353 resp
3354 }
3355 "OAUTH_CLIENT_CREDENTIALS" => {
3356 let mut resp = json!({});
3357 if let Some(oauth) = params.get("OAuthParameters") {
3358 resp["OAuthParameters"] = json!({
3359 "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
3360 "HttpMethod": oauth["HttpMethod"],
3361 "ClientParameters": {
3362 "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
3363 },
3364 });
3365 }
3366 resp
3367 }
3368 _ => params.clone(),
3369 }
3370}
3371
3372fn matches_pattern(
3376 pattern_json: Option<&str>,
3377 source: &str,
3378 detail_type: &str,
3379 detail: &str,
3380 account: &str,
3381 region: &str,
3382 resources: &[String],
3383) -> bool {
3384 let pattern_json = match pattern_json {
3385 Some(p) => p,
3386 None => return true,
3387 };
3388
3389 let pattern: Value = match serde_json::from_str(pattern_json) {
3390 Ok(v) => v,
3391 Err(_) => return false,
3392 };
3393
3394 let pattern_obj = match pattern.as_object() {
3395 Some(o) => o,
3396 None => return false,
3397 };
3398
3399 let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
3400 let event = json!({
3401 "source": source,
3402 "detail-type": detail_type,
3403 "detail": detail_value,
3404 "account": account,
3405 "region": region,
3406 "resources": resources,
3407 });
3408
3409 for (key, pattern_value) in pattern_obj {
3410 let event_value = &event[key];
3411 if !matches_value(pattern_value, event_value) {
3412 return false;
3413 }
3414 }
3415
3416 true
3417}
3418
3419fn matches_value(pattern: &Value, event_value: &Value) -> bool {
3420 match pattern {
3421 Value::Object(obj) => {
3422 for (key, sub_pattern) in obj {
3423 let sub_value = &event_value[key];
3424 if !matches_value(sub_pattern, sub_value) {
3425 return false;
3426 }
3427 }
3428 true
3429 }
3430 Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
3431 _ => false,
3432 }
3433}
3434
3435fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
3436 match pattern_elem {
3437 Value::Object(obj) => {
3438 if let Some(prefix_val) = obj.get("prefix") {
3439 if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
3440 return actual.starts_with(prefix);
3441 }
3442 return false;
3443 }
3444 if let Some(exists_val) = obj.get("exists") {
3445 let should_exist = exists_val.as_bool().unwrap_or(true);
3446 let does_exist = !event_value.is_null();
3447 return should_exist == does_exist;
3448 }
3449 if let Some(anything_but_val) = obj.get("anything-but") {
3450 return match anything_but_val {
3451 Value::String(s) => event_value.as_str() != Some(s.as_str()),
3452 Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
3453 Value::Number(_) => event_value != anything_but_val,
3454 _ => true,
3455 };
3456 }
3457 if let Some(numeric_val) = obj.get("numeric") {
3458 return matches_numeric(numeric_val, event_value);
3459 }
3460 false
3461 }
3462 _ => values_equal(pattern_elem, event_value),
3463 }
3464}
3465
3466fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
3467 let arr = match numeric_arr.as_array() {
3468 Some(a) => a,
3469 None => return false,
3470 };
3471 let actual = match event_value.as_f64() {
3472 Some(n) => n,
3473 None => return false,
3474 };
3475 let mut i = 0;
3476 while i + 1 < arr.len() {
3477 let op = match arr[i].as_str() {
3478 Some(s) => s,
3479 None => return false,
3480 };
3481 let threshold = match arr[i + 1].as_f64() {
3482 Some(n) => n,
3483 None => return false,
3484 };
3485 let ok = match op {
3486 ">" => actual > threshold,
3487 ">=" => actual >= threshold,
3488 "<" => actual < threshold,
3489 "<=" => actual <= threshold,
3490 "=" => (actual - threshold).abs() < f64::EPSILON,
3491 _ => return false,
3492 };
3493 if !ok {
3494 return false;
3495 }
3496 i += 2;
3497 }
3498 true
3499}
3500
3501fn values_equal(a: &Value, b: &Value) -> bool {
3502 a == b
3503}
3504
3505fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
3507 let path = path.strip_prefix('$').unwrap_or(path);
3508 let mut current = event;
3509 for segment in path.split('.') {
3510 if segment.is_empty() {
3511 continue;
3512 }
3513 current = current.get(segment)?;
3514 }
3515 Some(current.clone())
3516}
3517
3518fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
3520 let input_paths_map = transformer
3521 .get("InputPathsMap")
3522 .and_then(|v| v.as_object())
3523 .cloned()
3524 .unwrap_or_default();
3525 let template = transformer
3526 .get("InputTemplate")
3527 .and_then(|v| v.as_str())
3528 .unwrap_or("")
3529 .to_string();
3530
3531 let mut resolved: HashMap<String, Value> = HashMap::new();
3533 for (var_name, path_val) in &input_paths_map {
3534 if let Some(path_str) = path_val.as_str() {
3535 if let Some(val) = resolve_json_path(event, path_str) {
3536 resolved.insert(var_name.clone(), val);
3537 }
3538 }
3539 }
3540
3541 let mut result = template;
3543 for (var_name, val) in &resolved {
3544 let placeholder = format!("<{var_name}>");
3545 let replacement = match val {
3546 Value::String(s) => s.clone(),
3547 other => other.to_string(),
3548 };
3549 result = result.replace(&placeholder, &replacement);
3550 }
3551
3552 result
3553}
3554
3555fn missing(name: &str) -> AwsServiceError {
3556 AwsServiceError::aws_error(
3557 StatusCode::BAD_REQUEST,
3558 "ValidationException",
3559 format!("The request must contain the parameter {name}"),
3560 )
3561}
3562
3563pub fn deliver_to_logs(
3566 logs_state: &SharedLogsState,
3567 log_group_arn: &str,
3568 payload: &str,
3569 timestamp: chrono::DateTime<chrono::Utc>,
3570) {
3571 let group_name = if log_group_arn.contains(":log-group:") {
3574 log_group_arn
3575 .split(":log-group:")
3576 .nth(1)
3577 .unwrap_or(log_group_arn)
3578 .trim_end_matches(":*")
3579 } else {
3580 log_group_arn
3581 };
3582
3583 let stream_name = "events".to_string();
3584 let ts_millis = timestamp.timestamp_millis();
3585
3586 let mut state = logs_state.write();
3587 let region = state.region.clone();
3588 let account_id = state.account_id.clone();
3589
3590 let group = state
3592 .log_groups
3593 .entry(group_name.to_string())
3594 .or_insert_with(|| fakecloud_logs::state::LogGroup {
3595 name: group_name.to_string(),
3596 arn: format!("arn:aws:logs:{region}:{account_id}:log-group:{group_name}"),
3597 creation_time: ts_millis,
3598 retention_in_days: None,
3599 kms_key_id: None,
3600 tags: HashMap::new(),
3601 log_streams: HashMap::new(),
3602 stored_bytes: 0,
3603 subscription_filters: Vec::new(),
3604 data_protection_policy: None,
3605 index_policies: Vec::new(),
3606 transformer: None,
3607 deletion_protection: false,
3608 });
3609
3610 let stream = group
3611 .log_streams
3612 .entry(stream_name.clone())
3613 .or_insert_with(|| fakecloud_logs::state::LogStream {
3614 name: stream_name,
3615 arn: format!("{}:log-stream:events", group.arn),
3616 creation_time: ts_millis,
3617 first_event_timestamp: None,
3618 last_event_timestamp: None,
3619 last_ingestion_time: None,
3620 upload_sequence_token: "1".to_string(),
3621 events: Vec::new(),
3622 });
3623
3624 stream.events.push(fakecloud_logs::state::LogEvent {
3625 timestamp: ts_millis,
3626 message: payload.to_string(),
3627 ingestion_time: ts_millis,
3628 });
3629 stream.last_event_timestamp = Some(ts_millis);
3630 stream.last_ingestion_time = Some(ts_millis);
3631 if stream.first_event_timestamp.is_none() {
3632 stream.first_event_timestamp = Some(ts_millis);
3633 }
3634}
3635
3636#[cfg(test)]
3637mod tests {
3638 use super::*;
3639
3640 fn test_matches(
3642 pattern_json: Option<&str>,
3643 source: &str,
3644 detail_type: &str,
3645 detail: &str,
3646 ) -> bool {
3647 matches_pattern(
3648 pattern_json,
3649 source,
3650 detail_type,
3651 detail,
3652 "123456789012",
3653 "us-east-1",
3654 &[],
3655 )
3656 }
3657
3658 #[test]
3659 fn pattern_matches_source() {
3660 assert!(test_matches(
3661 Some(r#"{"source": ["my.app"]}"#),
3662 "my.app",
3663 "OrderPlaced",
3664 "{}"
3665 ));
3666 assert!(!test_matches(
3667 Some(r#"{"source": ["other.app"]}"#),
3668 "my.app",
3669 "OrderPlaced",
3670 "{}"
3671 ));
3672 }
3673
3674 #[test]
3675 fn pattern_matches_detail_type() {
3676 assert!(test_matches(
3677 Some(r#"{"detail-type": ["OrderPlaced"]}"#),
3678 "my.app",
3679 "OrderPlaced",
3680 "{}"
3681 ));
3682 assert!(!test_matches(
3683 Some(r#"{"detail-type": ["OrderShipped"]}"#),
3684 "my.app",
3685 "OrderPlaced",
3686 "{}"
3687 ));
3688 }
3689
3690 #[test]
3691 fn pattern_matches_detail_field() {
3692 assert!(test_matches(
3693 Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
3694 "my.app",
3695 "StatusChange",
3696 r#"{"status": "ACTIVE"}"#
3697 ));
3698 assert!(!test_matches(
3699 Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
3700 "my.app",
3701 "StatusChange",
3702 r#"{"status": "INACTIVE"}"#
3703 ));
3704 }
3705
3706 #[test]
3707 fn no_pattern_matches_everything() {
3708 assert!(test_matches(None, "any", "any", "{}"));
3709 }
3710
3711 #[test]
3712 fn combined_pattern() {
3713 let pattern = r#"{"source": ["orders"], "detail-type": ["OrderPlaced"]}"#;
3714 assert!(test_matches(Some(pattern), "orders", "OrderPlaced", "{}"));
3715 assert!(!test_matches(Some(pattern), "orders", "OrderShipped", "{}"));
3716 assert!(!test_matches(Some(pattern), "other", "OrderPlaced", "{}"));
3717 }
3718
3719 #[test]
3720 fn nested_detail_pattern() {
3721 let pattern = r#"{"detail": {"order": {"status": ["PLACED"]}}}"#;
3722 assert!(test_matches(
3723 Some(pattern),
3724 "my.app",
3725 "OrderEvent",
3726 r#"{"order": {"status": "PLACED", "id": "123"}}"#
3727 ));
3728 assert!(!test_matches(
3729 Some(pattern),
3730 "my.app",
3731 "OrderEvent",
3732 r#"{"order": {"status": "SHIPPED", "id": "123"}}"#
3733 ));
3734 assert!(!test_matches(
3735 Some(pattern),
3736 "my.app",
3737 "OrderEvent",
3738 r#"{"order": {"id": "123"}}"#
3739 ));
3740 }
3741
3742 #[test]
3743 fn deeply_nested_detail_pattern() {
3744 let pattern = r#"{"detail": {"a": {"b": {"c": ["deep"]}}}}"#;
3745 assert!(test_matches(
3746 Some(pattern),
3747 "src",
3748 "type",
3749 r#"{"a": {"b": {"c": "deep"}}}"#
3750 ));
3751 assert!(!test_matches(
3752 Some(pattern),
3753 "src",
3754 "type",
3755 r#"{"a": {"b": {"c": "shallow"}}}"#
3756 ));
3757 }
3758
3759 #[test]
3760 fn prefix_matcher() {
3761 let pattern = r#"{"source": [{"prefix": "com.myapp"}]}"#;
3762 assert!(test_matches(
3763 Some(pattern),
3764 "com.myapp.orders",
3765 "OrderPlaced",
3766 "{}"
3767 ));
3768 assert!(test_matches(
3769 Some(pattern),
3770 "com.myapp",
3771 "OrderPlaced",
3772 "{}"
3773 ));
3774 assert!(!test_matches(
3775 Some(pattern),
3776 "com.other",
3777 "OrderPlaced",
3778 "{}"
3779 ));
3780 }
3781
3782 #[test]
3783 fn prefix_matcher_in_detail() {
3784 let pattern = r#"{"detail": {"region": [{"prefix": "us-"}]}}"#;
3785 assert!(test_matches(
3786 Some(pattern),
3787 "src",
3788 "type",
3789 r#"{"region": "us-east-1"}"#
3790 ));
3791 assert!(!test_matches(
3792 Some(pattern),
3793 "src",
3794 "type",
3795 r#"{"region": "eu-west-1"}"#
3796 ));
3797 }
3798
3799 #[test]
3800 fn exists_matcher() {
3801 let pattern = r#"{"detail": {"error": [{"exists": true}]}}"#;
3802 assert!(test_matches(
3803 Some(pattern),
3804 "src",
3805 "type",
3806 r#"{"error": "something broke"}"#
3807 ));
3808 assert!(!test_matches(
3809 Some(pattern),
3810 "src",
3811 "type",
3812 r#"{"status": "ok"}"#
3813 ));
3814
3815 let pattern = r#"{"detail": {"error": [{"exists": false}]}}"#;
3816 assert!(test_matches(
3817 Some(pattern),
3818 "src",
3819 "type",
3820 r#"{"status": "ok"}"#
3821 ));
3822 assert!(!test_matches(
3823 Some(pattern),
3824 "src",
3825 "type",
3826 r#"{"error": "something broke"}"#
3827 ));
3828 }
3829
3830 #[test]
3831 fn anything_but_matcher() {
3832 let pattern = r#"{"source": [{"anything-but": "internal"}]}"#;
3833 assert!(test_matches(Some(pattern), "external", "Event", "{}"));
3834 assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
3835
3836 let pattern = r#"{"source": [{"anything-but": ["internal", "test"]}]}"#;
3837 assert!(test_matches(Some(pattern), "external", "Event", "{}"));
3838 assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
3839 assert!(!test_matches(Some(pattern), "test", "Event", "{}"));
3840 }
3841
3842 #[test]
3843 fn anything_but_in_detail() {
3844 let pattern = r#"{"detail": {"env": [{"anything-but": "prod"}]}}"#;
3845 assert!(test_matches(
3846 Some(pattern),
3847 "src",
3848 "type",
3849 r#"{"env": "staging"}"#
3850 ));
3851 assert!(!test_matches(
3852 Some(pattern),
3853 "src",
3854 "type",
3855 r#"{"env": "prod"}"#
3856 ));
3857 }
3858
3859 #[test]
3860 fn numeric_greater_than() {
3861 let pattern = r#"{"detail": {"count": [{"numeric": [">", 100]}]}}"#;
3862 assert!(test_matches(
3863 Some(pattern),
3864 "src",
3865 "type",
3866 r#"{"count": 150}"#
3867 ));
3868 assert!(!test_matches(
3869 Some(pattern),
3870 "src",
3871 "type",
3872 r#"{"count": 100}"#
3873 ));
3874 assert!(!test_matches(
3875 Some(pattern),
3876 "src",
3877 "type",
3878 r#"{"count": 50}"#
3879 ));
3880 }
3881
3882 #[test]
3883 fn numeric_less_than() {
3884 let pattern = r#"{"detail": {"count": [{"numeric": ["<", 10]}]}}"#;
3885 assert!(test_matches(
3886 Some(pattern),
3887 "src",
3888 "type",
3889 r#"{"count": 5}"#
3890 ));
3891 assert!(!test_matches(
3892 Some(pattern),
3893 "src",
3894 "type",
3895 r#"{"count": 10}"#
3896 ));
3897 assert!(!test_matches(
3898 Some(pattern),
3899 "src",
3900 "type",
3901 r#"{"count": 15}"#
3902 ));
3903 }
3904
3905 #[test]
3906 fn numeric_range() {
3907 let pattern = r#"{"detail": {"count": [{"numeric": [">=", 50, "<", 200]}]}}"#;
3908 assert!(test_matches(
3909 Some(pattern),
3910 "src",
3911 "type",
3912 r#"{"count": 50}"#
3913 ));
3914 assert!(test_matches(
3915 Some(pattern),
3916 "src",
3917 "type",
3918 r#"{"count": 100}"#
3919 ));
3920 assert!(!test_matches(
3921 Some(pattern),
3922 "src",
3923 "type",
3924 r#"{"count": 200}"#
3925 ));
3926 assert!(!test_matches(
3927 Some(pattern),
3928 "src",
3929 "type",
3930 r#"{"count": 49}"#
3931 ));
3932 }
3933
3934 #[test]
3935 fn mixed_matchers_and_literals() {
3936 let pattern = r#"{"source": ["exact.match", {"prefix": "com.myapp"}]}"#;
3937 assert!(test_matches(Some(pattern), "exact.match", "Event", "{}"));
3938 assert!(test_matches(
3939 Some(pattern),
3940 "com.myapp.orders",
3941 "Event",
3942 "{}"
3943 ));
3944 assert!(!test_matches(Some(pattern), "other.source", "Event", "{}"));
3945 }
3946
3947 use crate::state::EventBridgeState;
3950 use fakecloud_core::delivery::DeliveryBus;
3951 use parking_lot::RwLock;
3952
3953 fn make_service() -> EventBridgeService {
3954 let state = Arc::new(RwLock::new(EventBridgeState::new(
3955 "123456789012",
3956 "us-east-1",
3957 )));
3958 let delivery = Arc::new(DeliveryBus::new());
3959 EventBridgeService::new(state, delivery)
3960 }
3961
3962 fn make_request(action: &str, body: Value) -> AwsRequest {
3963 AwsRequest {
3964 service: "events".to_string(),
3965 action: action.to_string(),
3966 region: "us-east-1".to_string(),
3967 account_id: "123456789012".to_string(),
3968 request_id: "test-id".to_string(),
3969 headers: http::HeaderMap::new(),
3970 query_params: HashMap::new(),
3971 body: serde_json::to_vec(&body).unwrap().into(),
3972 path_segments: vec![],
3973 raw_path: "/".to_string(),
3974 method: http::Method::POST,
3975 is_query_protocol: false,
3976 access_key_id: None,
3977 }
3978 }
3979
3980 fn create_connection(svc: &EventBridgeService, name: &str) {
3981 let req = make_request(
3982 "CreateConnection",
3983 json!({
3984 "Name": name,
3985 "AuthorizationType": "API_KEY",
3986 "AuthParameters": {
3987 "ApiKeyAuthParameters": {
3988 "ApiKeyName": "x-api-key",
3989 "ApiKeyValue": "secret"
3990 }
3991 }
3992 }),
3993 );
3994 svc.create_connection(&req).unwrap();
3995 }
3996
3997 fn create_api_destination(svc: &EventBridgeService, name: &str, conn_name: &str) {
3998 let conn_arn_field = {
3999 let state = svc.state.read();
4000 state.connections.get(conn_name).unwrap().arn.clone()
4001 };
4002 let req = make_request(
4003 "CreateApiDestination",
4004 json!({
4005 "Name": name,
4006 "ConnectionArn": conn_arn_field,
4007 "InvocationEndpoint": "https://example.com",
4008 "HttpMethod": "POST"
4009 }),
4010 );
4011 svc.create_api_destination(&req).unwrap();
4012 }
4013
4014 #[test]
4017 fn list_connections_returns_all_by_default() {
4018 let svc = make_service();
4019 create_connection(&svc, "conn-alpha");
4020 create_connection(&svc, "conn-beta");
4021 create_connection(&svc, "conn-gamma");
4022
4023 let req = make_request("ListConnections", json!({}));
4024 let resp = svc.list_connections(&req).unwrap();
4025 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4026 assert_eq!(body["Connections"].as_array().unwrap().len(), 3);
4027 assert!(body["NextToken"].is_null());
4028 }
4029
4030 #[test]
4031 fn list_connections_name_prefix_filter() {
4032 let svc = make_service();
4033 create_connection(&svc, "prod-conn-1");
4034 create_connection(&svc, "prod-conn-2");
4035 create_connection(&svc, "dev-conn-1");
4036
4037 let req = make_request("ListConnections", json!({ "NamePrefix": "prod-" }));
4038 let resp = svc.list_connections(&req).unwrap();
4039 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4040 let names: Vec<&str> = body["Connections"]
4041 .as_array()
4042 .unwrap()
4043 .iter()
4044 .map(|c| c["Name"].as_str().unwrap())
4045 .collect();
4046 assert_eq!(names.len(), 2);
4047 assert!(names.iter().all(|n| n.starts_with("prod-")));
4048 }
4049
4050 #[test]
4051 fn list_connections_state_filter() {
4052 let svc = make_service();
4053 create_connection(&svc, "conn-a");
4054 create_connection(&svc, "conn-b");
4055
4056 {
4058 let mut state = svc.state.write();
4059 state
4060 .connections
4061 .get_mut("conn-b")
4062 .unwrap()
4063 .connection_state = "DEAUTHORIZED".to_string();
4064 }
4065
4066 let req = make_request(
4067 "ListConnections",
4068 json!({ "ConnectionState": "AUTHORIZED" }),
4069 );
4070 let resp = svc.list_connections(&req).unwrap();
4071 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4072 let conns = body["Connections"].as_array().unwrap();
4073 assert_eq!(conns.len(), 1);
4074 assert_eq!(conns[0]["Name"].as_str().unwrap(), "conn-a");
4075 }
4076
4077 #[test]
4078 fn list_connections_pagination() {
4079 let svc = make_service();
4080 for i in 0..5 {
4081 create_connection(&svc, &format!("conn-{i:02}"));
4082 }
4083
4084 let req = make_request("ListConnections", json!({ "Limit": 2 }));
4086 let resp = svc.list_connections(&req).unwrap();
4087 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4088 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4089 let token = body["NextToken"].as_str().unwrap();
4090 assert_eq!(token, "2");
4091
4092 let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4094 let resp = svc.list_connections(&req).unwrap();
4095 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4096 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4097 let token = body["NextToken"].as_str().unwrap();
4098 assert_eq!(token, "4");
4099
4100 let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4102 let resp = svc.list_connections(&req).unwrap();
4103 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4104 assert_eq!(body["Connections"].as_array().unwrap().len(), 1);
4105 assert!(body["NextToken"].is_null());
4106 }
4107
4108 #[test]
4109 fn list_connections_pagination_with_filter() {
4110 let svc = make_service();
4111 for i in 0..4 {
4112 create_connection(&svc, &format!("prod-{i:02}"));
4113 }
4114 create_connection(&svc, "dev-00");
4115
4116 let req = make_request(
4117 "ListConnections",
4118 json!({ "NamePrefix": "prod-", "Limit": 2 }),
4119 );
4120 let resp = svc.list_connections(&req).unwrap();
4121 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4122 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4123 assert!(body["NextToken"].as_str().is_some());
4124 }
4125
4126 #[test]
4129 fn list_api_destinations_returns_all_by_default() {
4130 let svc = make_service();
4131 create_connection(&svc, "my-conn");
4132 create_api_destination(&svc, "dest-alpha", "my-conn");
4133 create_api_destination(&svc, "dest-beta", "my-conn");
4134
4135 let req = make_request("ListApiDestinations", json!({}));
4136 let resp = svc.list_api_destinations(&req).unwrap();
4137 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4138 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4139 assert!(body["NextToken"].is_null());
4140 }
4141
4142 #[test]
4143 fn list_api_destinations_name_prefix_filter() {
4144 let svc = make_service();
4145 create_connection(&svc, "my-conn");
4146 create_api_destination(&svc, "prod-dest-1", "my-conn");
4147 create_api_destination(&svc, "prod-dest-2", "my-conn");
4148 create_api_destination(&svc, "dev-dest-1", "my-conn");
4149
4150 let req = make_request("ListApiDestinations", json!({ "NamePrefix": "prod-" }));
4151 let resp = svc.list_api_destinations(&req).unwrap();
4152 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4153 let names: Vec<&str> = body["ApiDestinations"]
4154 .as_array()
4155 .unwrap()
4156 .iter()
4157 .map(|d| d["Name"].as_str().unwrap())
4158 .collect();
4159 assert_eq!(names.len(), 2);
4160 assert!(names.iter().all(|n| n.starts_with("prod-")));
4161 }
4162
4163 #[test]
4164 fn list_api_destinations_connection_arn_filter() {
4165 let svc = make_service();
4166 create_connection(&svc, "conn-a");
4167 create_connection(&svc, "conn-b");
4168 create_api_destination(&svc, "dest-1", "conn-a");
4169 create_api_destination(&svc, "dest-2", "conn-b");
4170 create_api_destination(&svc, "dest-3", "conn-a");
4171
4172 let conn_a_arn = {
4173 let state = svc.state.read();
4174 state.connections.get("conn-a").unwrap().arn.clone()
4175 };
4176
4177 let req = make_request(
4178 "ListApiDestinations",
4179 json!({ "ConnectionArn": conn_a_arn }),
4180 );
4181 let resp = svc.list_api_destinations(&req).unwrap();
4182 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4183 let names: Vec<&str> = body["ApiDestinations"]
4184 .as_array()
4185 .unwrap()
4186 .iter()
4187 .map(|d| d["Name"].as_str().unwrap())
4188 .collect();
4189 assert_eq!(names.len(), 2);
4190 assert!(names.contains(&"dest-1"));
4191 assert!(names.contains(&"dest-3"));
4192 }
4193
4194 #[test]
4195 fn list_api_destinations_pagination() {
4196 let svc = make_service();
4197 create_connection(&svc, "my-conn");
4198 for i in 0..5 {
4199 create_api_destination(&svc, &format!("dest-{i:02}"), "my-conn");
4200 }
4201
4202 let req = make_request("ListApiDestinations", json!({ "Limit": 2 }));
4204 let resp = svc.list_api_destinations(&req).unwrap();
4205 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4206 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4207 let token = body["NextToken"].as_str().unwrap();
4208 assert_eq!(token, "2");
4209
4210 let req = make_request(
4212 "ListApiDestinations",
4213 json!({ "Limit": 2, "NextToken": token }),
4214 );
4215 let resp = svc.list_api_destinations(&req).unwrap();
4216 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4217 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4218 let token = body["NextToken"].as_str().unwrap();
4219 assert_eq!(token, "4");
4220
4221 let req = make_request(
4223 "ListApiDestinations",
4224 json!({ "Limit": 2, "NextToken": token }),
4225 );
4226 let resp = svc.list_api_destinations(&req).unwrap();
4227 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4228 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 1);
4229 assert!(body["NextToken"].is_null());
4230 }
4231
4232 fn create_event_bus(svc: &EventBridgeService, name: &str) {
4235 let req = make_request("CreateEventBus", json!({ "Name": name }));
4236 svc.create_event_bus(&req).unwrap();
4237 }
4238
4239 #[test]
4240 fn list_event_buses_pagination() {
4241 let svc = make_service();
4242 for i in 0..4 {
4244 create_event_bus(&svc, &format!("bus-{i:02}"));
4245 }
4246
4247 let req = make_request("ListEventBuses", json!({ "Limit": 2 }));
4249 let resp = svc.list_event_buses(&req).unwrap();
4250 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4251 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4252 let token = body["NextToken"].as_str().unwrap();
4253 assert_eq!(token, "2");
4254
4255 let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4257 let resp = svc.list_event_buses(&req).unwrap();
4258 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4259 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4260 let token = body["NextToken"].as_str().unwrap();
4261 assert_eq!(token, "4");
4262
4263 let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4265 let resp = svc.list_event_buses(&req).unwrap();
4266 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4267 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 1);
4268 assert!(body["NextToken"].is_null());
4269 }
4270
4271 #[test]
4272 fn list_event_buses_no_pagination_returns_all() {
4273 let svc = make_service();
4274 create_event_bus(&svc, "bus-alpha");
4275 create_event_bus(&svc, "bus-beta");
4276
4277 let req = make_request("ListEventBuses", json!({}));
4278 let resp = svc.list_event_buses(&req).unwrap();
4279 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4280 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 3);
4282 assert!(body["NextToken"].is_null());
4283 }
4284
4285 #[test]
4288 fn put_events_never_includes_endpoint_id_in_response() {
4289 let svc = make_service();
4290 let req = make_request(
4292 "PutEvents",
4293 json!({
4294 "EndpointId": "my-endpoint.abc123",
4295 "Entries": [{
4296 "Source": "my.source",
4297 "DetailType": "MyType",
4298 "Detail": "{}",
4299 "EventBusName": "default"
4300 }]
4301 }),
4302 );
4303 let resp = svc.put_events(&req).unwrap();
4304 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4305 assert!(
4306 !body.as_object().unwrap().contains_key("EndpointId"),
4307 "EndpointId should never be in the PutEvents response"
4308 );
4309 assert_eq!(body["FailedEntryCount"], 0);
4310 }
4311
4312 fn create_archive(svc: &EventBridgeService, name: &str) {
4315 let req = make_request(
4316 "CreateArchive",
4317 json!({
4318 "ArchiveName": name,
4319 "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4320 }),
4321 );
4322 svc.create_archive(&req).unwrap();
4323 }
4324
4325 #[test]
4326 fn list_archives_pagination() {
4327 let svc = make_service();
4328 for i in 0..5 {
4329 create_archive(&svc, &format!("archive-{i:02}"));
4330 }
4331
4332 let req = make_request("ListArchives", json!({ "Limit": 2 }));
4334 let resp = svc.list_archives(&req).unwrap();
4335 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4336 assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4337 let token = body["NextToken"].as_str().unwrap();
4338 assert_eq!(token, "2");
4339
4340 let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4342 let resp = svc.list_archives(&req).unwrap();
4343 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4344 assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4345 let token = body["NextToken"].as_str().unwrap();
4346 assert_eq!(token, "4");
4347
4348 let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4350 let resp = svc.list_archives(&req).unwrap();
4351 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4352 assert_eq!(body["Archives"].as_array().unwrap().len(), 1);
4353 assert!(body["NextToken"].is_null());
4354 }
4355
4356 fn create_replay(svc: &EventBridgeService, name: &str) {
4359 let archive_arn = {
4361 let state = svc.state.read();
4362 if state.archives.contains_key("replay-archive") {
4363 state.archives["replay-archive"].arn.clone()
4364 } else {
4365 drop(state);
4366 create_archive(svc, "replay-archive");
4367 svc.state.read().archives["replay-archive"].arn.clone()
4368 }
4369 };
4370 let req = make_request(
4371 "StartReplay",
4372 json!({
4373 "ReplayName": name,
4374 "EventSourceArn": archive_arn,
4375 "EventStartTime": 1000000.0,
4376 "EventEndTime": 2000000.0,
4377 "Destination": {
4378 "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4379 }
4380 }),
4381 );
4382 svc.start_replay(&req).unwrap();
4383 }
4384
4385 #[test]
4386 fn list_replays_pagination() {
4387 let svc = make_service();
4388 for i in 0..5 {
4389 create_replay(&svc, &format!("replay-{i:02}"));
4390 }
4391
4392 let req = make_request("ListReplays", json!({ "Limit": 2 }));
4394 let resp = svc.list_replays(&req).unwrap();
4395 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4396 assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4397 let token = body["NextToken"].as_str().unwrap();
4398 assert_eq!(token, "2");
4399
4400 let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4402 let resp = svc.list_replays(&req).unwrap();
4403 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4404 assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4405 let token = body["NextToken"].as_str().unwrap();
4406 assert_eq!(token, "4");
4407
4408 let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4410 let resp = svc.list_replays(&req).unwrap();
4411 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4412 assert_eq!(body["Replays"].as_array().unwrap().len(), 1);
4413 assert!(body["NextToken"].is_null());
4414 }
4415
4416 #[test]
4417 fn list_event_buses_invalid_next_token_returns_error() {
4418 let svc = make_service();
4419
4420 let req = make_request("ListEventBuses", json!({ "NextToken": "not-a-number" }));
4421 let result = svc.list_event_buses(&req);
4422 assert!(
4423 result.is_err(),
4424 "non-numeric NextToken should return an error"
4425 );
4426 }
4427
4428 #[test]
4431 fn test_event_pattern_match() {
4432 let svc = make_service();
4433 let req = make_request(
4434 "TestEventPattern",
4435 json!({
4436 "EventPattern": r#"{"source": ["my.app"]}"#,
4437 "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4438 }),
4439 );
4440 let resp = svc.test_event_pattern(&req).unwrap();
4441 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4442 assert_eq!(body["Result"], true);
4443 }
4444
4445 #[test]
4446 fn test_event_pattern_no_match() {
4447 let svc = make_service();
4448 let req = make_request(
4449 "TestEventPattern",
4450 json!({
4451 "EventPattern": r#"{"source": ["other.app"]}"#,
4452 "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4453 }),
4454 );
4455 let resp = svc.test_event_pattern(&req).unwrap();
4456 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4457 assert_eq!(body["Result"], false);
4458 }
4459
4460 #[test]
4461 fn test_event_pattern_detail_match() {
4462 let svc = make_service();
4463 let req = make_request(
4464 "TestEventPattern",
4465 json!({
4466 "EventPattern": r#"{"detail": {"status": ["PLACED"]}}"#,
4467 "Event": r#"{"source": "my.app", "detail-type": "Order", "detail": {"status": "PLACED", "id": "123"}}"#
4468 }),
4469 );
4470 let resp = svc.test_event_pattern(&req).unwrap();
4471 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4472 assert_eq!(body["Result"], true);
4473 }
4474
4475 #[test]
4478 fn update_event_bus_description() {
4479 let svc = make_service();
4480 create_event_bus(&svc, "my-bus");
4481
4482 let req = make_request(
4483 "UpdateEventBus",
4484 json!({ "Name": "my-bus", "Description": "Updated desc" }),
4485 );
4486 let resp = svc.update_event_bus(&req).unwrap();
4487 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4488 assert_eq!(body["Name"], "my-bus");
4489
4490 let req = make_request("DescribeEventBus", json!({ "Name": "my-bus" }));
4492 let resp = svc.describe_event_bus(&req).unwrap();
4493 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4494 assert_eq!(body["Description"], "Updated desc");
4495 }
4496
4497 #[test]
4498 fn update_event_bus_not_found() {
4499 let svc = make_service();
4500 let req = make_request(
4501 "UpdateEventBus",
4502 json!({ "Name": "ghost-bus", "Description": "nope" }),
4503 );
4504 assert!(svc.update_event_bus(&req).is_err());
4505 }
4506
4507 fn create_endpoint_helper(svc: &EventBridgeService, name: &str) {
4510 let req = make_request(
4511 "CreateEndpoint",
4512 json!({
4513 "Name": name,
4514 "RoutingConfig": {
4515 "FailoverConfig": {
4516 "Primary": { "HealthCheck": "" },
4517 "Secondary": { "Route": "us-west-2" }
4518 }
4519 },
4520 "EventBuses": [
4521 { "EventBusArn": "arn:aws:events:us-east-1:123456789012:event-bus/default" }
4522 ]
4523 }),
4524 );
4525 svc.create_endpoint(&req).unwrap();
4526 }
4527
4528 #[test]
4529 fn endpoint_create_describe_delete() {
4530 let svc = make_service();
4531 create_endpoint_helper(&svc, "my-endpoint");
4532
4533 let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4535 let resp = svc.describe_endpoint(&req).unwrap();
4536 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4537 assert_eq!(body["Name"], "my-endpoint");
4538 assert_eq!(body["State"], "ACTIVE");
4539 assert!(body["EndpointId"].as_str().unwrap().contains("my-endpoint"));
4540
4541 let req = make_request("DeleteEndpoint", json!({ "Name": "my-endpoint" }));
4543 svc.delete_endpoint(&req).unwrap();
4544
4545 let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4547 assert!(svc.describe_endpoint(&req).is_err());
4548 }
4549
4550 #[test]
4551 fn endpoint_list_and_update() {
4552 let svc = make_service();
4553 create_endpoint_helper(&svc, "ep-alpha");
4554 create_endpoint_helper(&svc, "ep-beta");
4555
4556 let req = make_request("ListEndpoints", json!({}));
4558 let resp = svc.list_endpoints(&req).unwrap();
4559 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4560 assert_eq!(body["Endpoints"].as_array().unwrap().len(), 2);
4561
4562 let req = make_request(
4564 "UpdateEndpoint",
4565 json!({ "Name": "ep-alpha", "Description": "updated" }),
4566 );
4567 let resp = svc.update_endpoint(&req).unwrap();
4568 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4569 assert_eq!(body["Name"], "ep-alpha");
4570
4571 let req = make_request("DescribeEndpoint", json!({ "Name": "ep-alpha" }));
4573 let resp = svc.describe_endpoint(&req).unwrap();
4574 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4575 assert_eq!(body["Description"], "updated");
4576 }
4577
4578 #[test]
4579 fn endpoint_duplicate_fails() {
4580 let svc = make_service();
4581 create_endpoint_helper(&svc, "dup-ep");
4582 let req = make_request(
4583 "CreateEndpoint",
4584 json!({
4585 "Name": "dup-ep",
4586 "RoutingConfig": {},
4587 "EventBuses": []
4588 }),
4589 );
4590 assert!(svc.create_endpoint(&req).is_err());
4591 }
4592
4593 #[test]
4596 fn deauthorize_connection_sets_state() {
4597 let svc = make_service();
4598 create_connection(&svc, "deauth-conn");
4599
4600 let req = make_request("DeauthorizeConnection", json!({ "Name": "deauth-conn" }));
4601 let resp = svc.deauthorize_connection(&req).unwrap();
4602 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4603 assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4604 assert!(body["ConnectionArn"]
4605 .as_str()
4606 .unwrap()
4607 .contains("deauth-conn"));
4608
4609 let req = make_request("DescribeConnection", json!({ "Name": "deauth-conn" }));
4611 let resp = svc.describe_connection(&req).unwrap();
4612 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4613 assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4614 }
4615
4616 #[test]
4617 fn deauthorize_connection_not_found() {
4618 let svc = make_service();
4619 let req = make_request("DeauthorizeConnection", json!({ "Name": "ghost-conn" }));
4620 assert!(svc.deauthorize_connection(&req).is_err());
4621 }
4622
4623 #[test]
4626 fn partner_event_source_crud() {
4627 let svc = make_service();
4628
4629 let req = make_request(
4631 "CreatePartnerEventSource",
4632 json!({ "Name": "partner/test", "Account": "123456789012" }),
4633 );
4634 svc.create_partner_event_source(&req).unwrap();
4635
4636 let req = make_request(
4638 "DescribePartnerEventSource",
4639 json!({ "Name": "partner/test" }),
4640 );
4641 let resp = svc.describe_partner_event_source(&req).unwrap();
4642 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4643 assert_eq!(body["Name"], "partner/test");
4644
4645 let req = make_request("ListPartnerEventSources", json!({}));
4647 let resp = svc.list_partner_event_sources(&req).unwrap();
4648 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4649 assert_eq!(body["PartnerEventSources"].as_array().unwrap().len(), 1);
4650
4651 let req = make_request(
4653 "ListPartnerEventSourceAccounts",
4654 json!({ "EventSourceName": "partner/test" }),
4655 );
4656 let resp = svc.list_partner_event_source_accounts(&req).unwrap();
4657 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4658 assert_eq!(
4659 body["PartnerEventSourceAccounts"].as_array().unwrap().len(),
4660 1
4661 );
4662
4663 let req = make_request("DescribeEventSource", json!({ "Name": "partner/test" }));
4665 let resp = svc.describe_event_source(&req).unwrap();
4666 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4667 assert_eq!(body["Name"], "partner/test");
4668 assert_eq!(body["State"], "ACTIVE");
4669
4670 let req = make_request("ListEventSources", json!({}));
4672 let resp = svc.list_event_sources(&req).unwrap();
4673 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4674 assert_eq!(body["EventSources"].as_array().unwrap().len(), 1);
4675
4676 let req = make_request(
4678 "DeletePartnerEventSource",
4679 json!({ "Name": "partner/test", "Account": "123456789012" }),
4680 );
4681 svc.delete_partner_event_source(&req).unwrap();
4682
4683 let req = make_request(
4685 "DescribePartnerEventSource",
4686 json!({ "Name": "partner/test" }),
4687 );
4688 assert!(svc.describe_partner_event_source(&req).is_err());
4689 }
4690
4691 #[test]
4692 fn activate_deactivate_event_source() {
4693 let svc = make_service();
4694
4695 let req = make_request(
4697 "CreatePartnerEventSource",
4698 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
4699 );
4700 svc.create_partner_event_source(&req).unwrap();
4701
4702 let req = make_request(
4704 "DeactivateEventSource",
4705 json!({ "Name": "aws.partner/test" }),
4706 );
4707 svc.deactivate_event_source(&req).unwrap();
4708 {
4709 let state = svc.state.read();
4710 assert_eq!(
4711 state.partner_event_sources["aws.partner/test"].state,
4712 "INACTIVE"
4713 );
4714 }
4715
4716 let req = make_request("ActivateEventSource", json!({ "Name": "aws.partner/test" }));
4718 svc.activate_event_source(&req).unwrap();
4719 {
4720 let state = svc.state.read();
4721 assert_eq!(
4722 state.partner_event_sources["aws.partner/test"].state,
4723 "ACTIVE"
4724 );
4725 }
4726
4727 let req = make_request("ActivateEventSource", json!({ "Name": "nonexistent" }));
4729 assert!(svc.activate_event_source(&req).is_err());
4730
4731 let req = make_request("DeactivateEventSource", json!({ "Name": "nonexistent" }));
4732 assert!(svc.deactivate_event_source(&req).is_err());
4733 }
4734
4735 #[test]
4736 fn delete_partner_event_source_verifies_account() {
4737 let svc = make_service();
4738
4739 let req = make_request(
4741 "CreatePartnerEventSource",
4742 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
4743 );
4744 svc.create_partner_event_source(&req).unwrap();
4745
4746 let req = make_request(
4748 "DeletePartnerEventSource",
4749 json!({ "Name": "aws.partner/test", "Account": "999999999999" }),
4750 );
4751 assert!(svc.delete_partner_event_source(&req).is_err());
4752 assert!(svc
4754 .state
4755 .read()
4756 .partner_event_sources
4757 .contains_key("aws.partner/test"));
4758
4759 let req = make_request(
4761 "DeletePartnerEventSource",
4762 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
4763 );
4764 svc.delete_partner_event_source(&req).unwrap();
4765 assert!(!svc
4766 .state
4767 .read()
4768 .partner_event_sources
4769 .contains_key("aws.partner/test"));
4770
4771 let req = make_request(
4773 "DeletePartnerEventSource",
4774 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
4775 );
4776 assert!(svc.delete_partner_event_source(&req).is_err());
4777 }
4778
4779 #[test]
4780 fn put_partner_events() {
4781 let svc = make_service();
4782 let req = make_request(
4783 "PutPartnerEvents",
4784 json!({
4785 "Entries": [
4786 { "Source": "partner.app", "DetailType": "Test", "Detail": "{}" }
4787 ]
4788 }),
4789 );
4790 let resp = svc.put_partner_events(&req).unwrap();
4791 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4792 assert_eq!(body["FailedEntryCount"], 0);
4793 assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
4794 assert!(body["Entries"][0]["EventId"].as_str().is_some());
4795 }
4796}