1use super::*;
2
3pub(crate) fn validate_put_events_entry(
8 source: &str,
9 detail_type: &str,
10 detail: &str,
11) -> Result<(), Value> {
12 if source.is_empty() {
13 return Err(json!({
14 "ErrorCode": "InvalidArgument",
15 "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
16 }));
17 }
18 if detail_type.is_empty() {
19 return Err(json!({
20 "ErrorCode": "InvalidArgument",
21 "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
22 }));
23 }
24 if detail.is_empty() {
25 return Err(json!({
26 "ErrorCode": "InvalidArgument",
27 "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
28 }));
29 }
30 if serde_json::from_str::<Value>(detail).is_err() {
31 return Err(json!({
32 "ErrorCode": "MalformedDetail",
33 "ErrorMessage": "Detail is malformed.",
34 }));
35 }
36 Ok(())
37}
38
39pub(crate) fn parse_put_events_time(raw: &Value) -> DateTime<Utc> {
44 if let Some(s) = raw.as_str() {
45 return DateTime::parse_from_rfc3339(s)
46 .map(|dt| dt.with_timezone(&Utc))
47 .unwrap_or_else(|_| Utc::now());
48 }
49 if let Some(ts) = raw.as_f64() {
50 return DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
51 .unwrap_or_else(Utc::now);
52 }
53 if let Some(ts) = raw.as_i64() {
54 return DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now);
55 }
56 Utc::now()
57}
58
59pub(crate) fn is_mutating_action(action: &str) -> bool {
61 matches!(
62 action,
63 "CreateEventBus"
64 | "DeleteEventBus"
65 | "UpdateEventBus"
66 | "PutRule"
67 | "DeleteRule"
68 | "EnableRule"
69 | "DisableRule"
70 | "PutTargets"
71 | "RemoveTargets"
72 | "PutEvents"
73 | "PutPermission"
74 | "RemovePermission"
75 | "TagResource"
76 | "UntagResource"
77 | "CreateArchive"
78 | "UpdateArchive"
79 | "DeleteArchive"
80 | "CreateConnection"
81 | "UpdateConnection"
82 | "DeleteConnection"
83 | "DeauthorizeConnection"
84 | "CreateApiDestination"
85 | "UpdateApiDestination"
86 | "DeleteApiDestination"
87 | "StartReplay"
88 | "CancelReplay"
89 | "CreatePartnerEventSource"
90 | "DeletePartnerEventSource"
91 | "ActivateEventSource"
92 | "DeactivateEventSource"
93 | "PutPartnerEvents"
94 | "CreateEndpoint"
95 | "DeleteEndpoint"
96 | "UpdateEndpoint"
97 )
98}
99
100pub(crate) fn parse_tags(body: &Value) -> BTreeMap<String, String> {
101 let mut tags = BTreeMap::new();
102 if let Some(arr) = body["Tags"].as_array() {
103 for tag in arr {
104 if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
105 tags.insert(key.to_string(), val.to_string());
106 }
107 }
108 }
109 tags
110}
111
112pub fn parse_target(target: &Value) -> EventTarget {
113 EventTarget {
114 id: target["Id"].as_str().unwrap_or("").to_string(),
115 arn: target["Arn"].as_str().unwrap_or("").to_string(),
116 input: target["Input"].as_str().map(|s| s.to_string()),
117 input_path: target["InputPath"].as_str().map(|s| s.to_string()),
118 input_transformer: target.get("InputTransformer").cloned(),
119 sqs_parameters: target.get("SqsParameters").cloned(),
120 role_arn: target["RoleArn"].as_str().map(|s| s.to_string()),
121 dead_letter_config: target.get("DeadLetterConfig").cloned(),
122 retry_policy: target.get("RetryPolicy").cloned(),
123 ecs_parameters: target.get("EcsParameters").cloned(),
124 batch_parameters: target.get("BatchParameters").cloned(),
125 kinesis_parameters: target.get("KinesisParameters").cloned(),
126 redshift_data_parameters: target.get("RedshiftDataParameters").cloned(),
127 http_parameters: target.get("HttpParameters").cloned(),
128 sage_maker_pipeline_parameters: target.get("SageMakerPipelineParameters").cloned(),
129 app_sync_parameters: target.get("AppSyncParameters").cloned(),
130 run_command_parameters: target.get("RunCommandParameters").cloned(),
131 }
132}
133
134pub(crate) fn target_to_json(t: &EventTarget) -> Value {
135 let mut obj = json!({ "Id": t.id, "Arn": t.arn });
136 if let Some(ref input) = t.input {
137 obj["Input"] = json!(input);
138 }
139 if let Some(ref input_path) = t.input_path {
140 obj["InputPath"] = json!(input_path);
141 }
142 if let Some(ref it) = t.input_transformer {
143 obj["InputTransformer"] = it.clone();
144 }
145 if let Some(ref sp) = t.sqs_parameters {
146 obj["SqsParameters"] = sp.clone();
147 }
148 if let Some(ref ra) = t.role_arn {
149 obj["RoleArn"] = json!(ra);
150 }
151 if let Some(ref dlc) = t.dead_letter_config {
152 obj["DeadLetterConfig"] = dlc.clone();
153 }
154 if let Some(ref rp) = t.retry_policy {
155 obj["RetryPolicy"] = rp.clone();
156 }
157 if let Some(ref p) = t.ecs_parameters {
158 obj["EcsParameters"] = p.clone();
159 }
160 if let Some(ref p) = t.batch_parameters {
161 obj["BatchParameters"] = p.clone();
162 }
163 if let Some(ref p) = t.kinesis_parameters {
164 obj["KinesisParameters"] = p.clone();
165 }
166 if let Some(ref p) = t.redshift_data_parameters {
167 obj["RedshiftDataParameters"] = p.clone();
168 }
169 if let Some(ref p) = t.http_parameters {
170 obj["HttpParameters"] = p.clone();
171 }
172 if let Some(ref p) = t.sage_maker_pipeline_parameters {
173 obj["SageMakerPipelineParameters"] = p.clone();
174 }
175 if let Some(ref p) = t.app_sync_parameters {
176 obj["AppSyncParameters"] = p.clone();
177 }
178 if let Some(ref p) = t.run_command_parameters {
179 obj["RunCommandParameters"] = p.clone();
180 }
181 obj
182}
183
184pub(crate) fn find_tags_mut<'a>(
185 state: &'a mut crate::state::EventBridgeState,
186 arn: &str,
187) -> Result<&'a mut BTreeMap<String, String>, AwsServiceError> {
188 for bus in state.buses.values_mut() {
190 if bus.arn == arn {
191 return Ok(&mut bus.tags);
192 }
193 }
194 for rule in state.rules.values_mut() {
196 if rule.arn == arn {
197 return Ok(&mut rule.tags);
198 }
199 }
200
201 let error_msg = if arn.contains(":rule/") {
203 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
205 if let Some(rule_path) = parts.first() {
206 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
207 format!("Rule {rule_name} does not exist on EventBus {bus}.")
208 } else {
209 format!("Rule {} does not exist on EventBus default.", rule_path)
210 }
211 } else {
212 format!("Resource {arn} not found.")
213 }
214 } else {
215 format!("Resource {arn} not found.")
216 };
217
218 Err(AwsServiceError::aws_error(
219 StatusCode::BAD_REQUEST,
220 "ResourceNotFoundException",
221 error_msg,
222 ))
223}
224
225pub(crate) fn find_tags<'a>(
226 state: &'a crate::state::EventBridgeState,
227 arn: &str,
228) -> Result<&'a BTreeMap<String, String>, AwsServiceError> {
229 for bus in state.buses.values() {
230 if bus.arn == arn {
231 return Ok(&bus.tags);
232 }
233 }
234 for rule in state.rules.values() {
235 if rule.arn == arn {
236 return Ok(&rule.tags);
237 }
238 }
239
240 let error_msg = if arn.contains(":rule/") {
241 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
242 if let Some(rule_path) = parts.first() {
243 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
244 format!("Rule {rule_name} does not exist on EventBus {bus}.")
245 } else {
246 format!("Rule {} does not exist on EventBus default.", rule_path)
247 }
248 } else {
249 format!("Resource {arn} not found.")
250 }
251 } else {
252 format!("Resource {arn} not found.")
253 };
254
255 Err(AwsServiceError::aws_error(
256 StatusCode::BAD_REQUEST,
257 "ResourceNotFoundException",
258 error_msg,
259 ))
260}
261
262pub(crate) fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
263 let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
264 AwsServiceError::aws_error(
265 StatusCode::BAD_REQUEST,
266 "InvalidEventPatternException",
267 "Event pattern is not valid. Reason: Invalid JSON",
268 )
269 })?;
270
271 validate_pattern_values(&parsed, "")?;
272 Ok(())
273}
274
275pub(crate) fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
276 match value {
277 Value::Object(obj) => {
278 for (key, val) in obj {
279 let new_path = if path.is_empty() {
280 key.clone()
281 } else {
282 format!("{path}.{key}")
283 };
284 match val {
285 Value::Object(_) => validate_pattern_values(val, &new_path)?,
286 Value::Array(_) => {} _ => {
288 return Err(AwsServiceError::aws_error(
289 StatusCode::BAD_REQUEST,
290 "InvalidEventPatternException",
291 format!(
292 "Event pattern is not valid. Reason: '{}' must be an object or an array",
293 key
294 ),
295 ));
296 }
297 }
298 }
299 Ok(())
300 }
301 _ => Ok(()),
302 }
303}
304
305pub(crate) fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
306 let mut resp = match auth_type {
307 "API_KEY" => {
308 let mut resp = json!({});
309 if let Some(api_key) = params.get("ApiKeyAuthParameters") {
310 resp["ApiKeyAuthParameters"] = json!({
311 "ApiKeyName": api_key["ApiKeyName"],
312 });
313 }
314 resp
315 }
316 "BASIC" => {
317 let mut resp = json!({});
318 if let Some(basic) = params.get("BasicAuthParameters") {
319 resp["BasicAuthParameters"] = json!({
320 "Username": basic["Username"],
321 });
322 }
323 resp
324 }
325 "OAUTH_CLIENT_CREDENTIALS" => {
326 let mut resp = json!({});
327 if let Some(oauth) = params.get("OAuthParameters") {
328 resp["OAuthParameters"] = json!({
329 "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
330 "HttpMethod": oauth["HttpMethod"],
331 "ClientParameters": {
332 "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
333 },
334 });
335 }
336 resp
337 }
338 _ => return params.clone(),
339 };
340
341 if let Some(inv) = params.get("InvocationHttpParameters") {
347 resp["InvocationHttpParameters"] = sanitize_invocation_http_params(inv);
348 }
349 resp
350}
351
352fn sanitize_invocation_http_params(inv: &Value) -> Value {
356 let mut out = json!({});
357 for field in [
358 "HeaderParameters",
359 "QueryStringParameters",
360 "BodyParameters",
361 ] {
362 if let Some(arr) = inv.get(field).and_then(|v| v.as_array()) {
363 let sanitized: Vec<Value> = arr
364 .iter()
365 .map(|p| {
366 let is_secret = p
367 .get("IsValueSecret")
368 .and_then(|v| v.as_bool())
369 .unwrap_or(true);
370 let mut entry = json!({
371 "Key": p.get("Key").cloned().unwrap_or(Value::Null),
372 "IsValueSecret": is_secret,
373 });
374 if !is_secret {
375 if let Some(val) = p.get("Value") {
376 entry["Value"] = val.clone();
377 }
378 }
379 entry
380 })
381 .collect();
382 out[field] = json!(sanitized);
383 }
384 }
385 out
386}
387
388pub(crate) fn matches_pattern(
390 pattern_json: Option<&str>,
391 source: &str,
392 detail_type: &str,
393 detail: &str,
394 account: &str,
395 region: &str,
396 resources: &[String],
397) -> bool {
398 let pattern_json = match pattern_json {
399 Some(p) => p,
400 None => return true,
401 };
402
403 let pattern: Value = match serde_json::from_str(pattern_json) {
404 Ok(v) => v,
405 Err(_) => return false,
406 };
407
408 if !pattern.is_object() {
409 return false;
410 }
411
412 let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
413 let event = json!({
414 "source": source,
415 "detail-type": detail_type,
416 "detail": detail_value,
417 "account": account,
418 "region": region,
419 "resources": resources,
420 });
421
422 matches_value(&pattern, &event)
423}
424
425pub(crate) fn matches_value(pattern: &Value, event_value: &Value) -> bool {
426 match pattern {
427 Value::Object(obj) => {
428 if let Some(Value::Array(alternatives)) = obj.get("$or") {
431 return alternatives
432 .iter()
433 .any(|alt| matches_value(alt, event_value));
434 }
435 for (key, sub_pattern) in obj {
436 if key == "$or" {
437 continue;
438 }
439 let sub_value = &event_value[key];
440 if !matches_value(sub_pattern, sub_value) {
441 return false;
442 }
443 }
444 true
445 }
446 Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
447 _ => false,
448 }
449}
450
451pub(crate) fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
452 match pattern_elem {
453 Value::Object(obj) => {
454 if let Some(prefix_val) = obj.get("prefix") {
455 if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
456 return actual.starts_with(prefix);
457 }
458 return false;
459 }
460 if let Some(suffix_val) = obj.get("suffix") {
461 if let (Some(suffix), Some(actual)) = (suffix_val.as_str(), event_value.as_str()) {
462 return actual.ends_with(suffix);
463 }
464 return false;
465 }
466 if let Some(eqic_val) = obj.get("equals-ignore-case") {
467 if let (Some(expected), Some(actual)) = (eqic_val.as_str(), event_value.as_str()) {
468 return expected.eq_ignore_ascii_case(actual);
469 }
470 return false;
471 }
472 if let Some(cidr_val) = obj.get("cidr") {
473 if let (Some(cidr), Some(actual)) = (cidr_val.as_str(), event_value.as_str()) {
474 return cidr_matches(cidr, actual);
475 }
476 return false;
477 }
478 if let Some(wild_val) = obj.get("wildcard") {
479 if let (Some(pattern), Some(actual)) = (wild_val.as_str(), event_value.as_str()) {
480 return wildcard_matches(pattern, actual);
481 }
482 return false;
483 }
484 if let Some(exists_val) = obj.get("exists") {
485 let should_exist = exists_val.as_bool().unwrap_or(true);
486 let does_exist = !event_value.is_null();
487 return should_exist == does_exist;
488 }
489 if let Some(anything_but_val) = obj.get("anything-but") {
490 return match anything_but_val {
491 Value::String(s) => event_value.as_str() != Some(s.as_str()),
492 Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
493 Value::Number(_) => event_value != anything_but_val,
494 Value::Object(nested) => {
500 let fv = event_value.as_str();
501 if let Some(p) = nested.get("prefix").and_then(|v| v.as_str()) {
502 !fv.is_some_and(|s| s.starts_with(p))
503 } else if let Some(suf) = nested.get("suffix").and_then(|v| v.as_str()) {
504 !fv.is_some_and(|s| s.ends_with(suf))
505 } else if let Some(eic) =
506 nested.get("equals-ignore-case").and_then(|v| v.as_str())
507 {
508 !fv.is_some_and(|s| s.eq_ignore_ascii_case(eic))
509 } else {
510 true
512 }
513 }
514 _ => true,
515 };
516 }
517 if let Some(numeric_val) = obj.get("numeric") {
518 return matches_numeric(numeric_val, event_value);
519 }
520 false
521 }
522 _ => values_equal(pattern_elem, event_value),
523 }
524}
525
526pub(crate) fn wildcard_matches(pattern: &str, actual: &str) -> bool {
529 let mut segments: Vec<String> = Vec::new();
530 let mut current = String::new();
531 let mut chars = pattern.chars();
532 while let Some(c) = chars.next() {
533 if c == '\\' {
534 if let Some(next) = chars.next() {
535 current.push(next);
536 }
537 } else if c == '*' {
538 segments.push(std::mem::take(&mut current));
539 } else {
540 current.push(c);
541 }
542 }
543 segments.push(current);
544
545 if segments.len() == 1 {
546 return segments[0] == actual;
547 }
548
549 let mut pos = 0;
550 let first = &segments[0];
551 if !actual[pos..].starts_with(first.as_str()) {
552 return false;
553 }
554 pos += first.len();
555
556 let last_idx = segments.len() - 1;
557 for (i, seg) in segments.iter().enumerate().skip(1) {
558 if i == last_idx {
559 if !actual[pos..].ends_with(seg.as_str()) {
561 return false;
562 }
563 return actual.len().saturating_sub(pos) >= seg.len();
564 }
565 match actual[pos..].find(seg.as_str()) {
566 Some(idx) => pos += idx + seg.len(),
567 None => return false,
568 }
569 }
570 true
571}
572
573pub(crate) fn cidr_matches(cidr: &str, actual: &str) -> bool {
575 let (net_str, prefix_str) = match cidr.split_once('/') {
576 Some(parts) => parts,
577 None => return false,
578 };
579 let prefix: u32 = match prefix_str.parse() {
580 Ok(p) if p <= 32 => p,
581 _ => return false,
582 };
583 let net = match parse_ipv4(net_str) {
584 Some(n) => n,
585 None => return false,
586 };
587 let value = match parse_ipv4(actual) {
588 Some(v) => v,
589 None => return false,
590 };
591 if prefix == 0 {
592 return true;
593 }
594 let mask = u32::MAX << (32 - prefix);
595 (net & mask) == (value & mask)
596}
597
598fn parse_ipv4(s: &str) -> Option<u32> {
599 let mut parts = s.split('.');
600 let mut result: u32 = 0;
601 for _ in 0..4 {
602 let octet: u32 = parts.next()?.parse().ok()?;
603 if octet > 255 {
604 return None;
605 }
606 result = (result << 8) | octet;
607 }
608 if parts.next().is_some() {
609 return None;
610 }
611 Some(result)
612}
613
614#[allow(clippy::too_many_arguments)]
618pub(crate) fn archive_matching_event(
619 state: &mut crate::state::EventBridgeState,
620 event: &PutEvent,
621 event_bus_name: &str,
622 source: &str,
623 detail_type: &str,
624 detail: &str,
625 account_id: &str,
626 region: &str,
627 resources: &[String],
628) {
629 let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
630 for akey in archive_keys {
631 let (archive_bus, archive_pattern, archive_enabled) = {
632 let a = &state.archives[&akey];
633 (
634 state.resolve_bus_name(&a.event_source_arn),
635 a.event_pattern.clone(),
636 a.state == "ENABLED",
637 )
638 };
639 if archive_bus != event_bus_name || !archive_enabled {
640 continue;
641 }
642 let pattern_matches = matches_pattern(
643 archive_pattern.as_deref(),
644 source,
645 detail_type,
646 detail,
647 account_id,
648 region,
649 resources,
650 );
651 if !pattern_matches {
652 continue;
653 }
654 if let Some(archive) = state.archives.get_mut(&akey) {
655 archive.event_count += 1;
656 archive.size_bytes += detail.len() as i64;
657 archive.events.push(event.clone());
658 }
659 }
660}
661
662#[allow(clippy::too_many_arguments)]
667pub(crate) fn collect_replay_events_with_targets(
668 state: &crate::state::EventBridgeState,
669 archive_name: &str,
670 bus_name: &str,
671 event_start_time: DateTime<Utc>,
672 event_end_time: DateTime<Utc>,
673 account_id: &str,
674 region: &str,
675) -> Vec<(PutEvent, Vec<EventTarget>)> {
676 let Some(archive) = state.archives.get(archive_name) else {
677 return Vec::new();
678 };
679
680 let replay_events: Vec<PutEvent> = archive
681 .events
682 .iter()
683 .filter(|e| e.time >= event_start_time && e.time < event_end_time)
684 .cloned()
685 .collect();
686
687 let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
688 for event in replay_events {
689 let matching_targets: Vec<EventTarget> = state
690 .rules
691 .values()
692 .filter(|r| {
693 r.event_bus_name == bus_name
694 && r.state == "ENABLED"
695 && matches_pattern(
696 r.event_pattern.as_deref(),
697 &event.source,
698 &event.detail_type,
699 &event.detail,
700 account_id,
701 region,
702 &event.resources,
703 )
704 })
705 .flat_map(|r| r.targets.clone())
706 .collect();
707
708 if !matching_targets.is_empty() {
709 events_to_deliver.push((event, matching_targets));
710 }
711 }
712 events_to_deliver
713}
714
715pub(crate) fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
716 let arr = match numeric_arr.as_array() {
717 Some(a) => a,
718 None => return false,
719 };
720 let actual = match event_value.as_f64() {
721 Some(n) => n,
722 None => return false,
723 };
724 let mut i = 0;
725 while i + 1 < arr.len() {
726 let op = match arr[i].as_str() {
727 Some(s) => s,
728 None => return false,
729 };
730 let threshold = match arr[i + 1].as_f64() {
731 Some(n) => n,
732 None => return false,
733 };
734 let ok = match op {
735 ">" => actual > threshold,
736 ">=" => actual >= threshold,
737 "<" => actual < threshold,
738 "<=" => actual <= threshold,
739 "=" => (actual - threshold).abs() < f64::EPSILON,
740 _ => return false,
741 };
742 if !ok {
743 return false;
744 }
745 i += 2;
746 }
747 true
748}
749
750pub(crate) fn values_equal(a: &Value, b: &Value) -> bool {
751 a == b
752}
753
754pub(crate) fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
756 let path = path.strip_prefix('$').unwrap_or(path);
757 let mut current = event;
758 for segment in path.split('.') {
759 if segment.is_empty() {
760 continue;
761 }
762 current = current.get(segment)?;
763 }
764 Some(current.clone())
765}
766
767pub(crate) fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
769 let input_paths_map = transformer
770 .get("InputPathsMap")
771 .and_then(|v| v.as_object())
772 .cloned()
773 .unwrap_or_default();
774 let template = transformer
775 .get("InputTemplate")
776 .and_then(|v| v.as_str())
777 .unwrap_or("")
778 .to_string();
779
780 let mut resolved: HashMap<String, Value> = HashMap::new();
782 for (var_name, path_val) in &input_paths_map {
783 if let Some(path_str) = path_val.as_str() {
784 if let Some(val) = resolve_json_path(event, path_str) {
785 resolved.insert(var_name.clone(), val);
786 }
787 }
788 }
789
790 let mut result = template;
792 for (var_name, val) in &resolved {
793 let placeholder = format!("<{var_name}>");
794 let replacement = match val {
795 Value::String(s) => s.clone(),
796 other => other.to_string(),
797 };
798 result = result.replace(&placeholder, &replacement);
799 }
800
801 result
802}
803
804pub(crate) fn missing(name: &str) -> AwsServiceError {
805 AwsServiceError::aws_error(
806 StatusCode::BAD_REQUEST,
807 "ValidationException",
808 format!("The request must contain the parameter {name}"),
809 )
810}
811
812pub(crate) fn function_name_from_arn(arn: &str) -> &str {
817 let parts: Vec<&str> = arn.split(':').collect();
818 if parts.len() >= 7 && parts[5] == "function" {
819 parts[6]
820 } else {
821 arn
822 }
823}
824
825pub(crate) fn invoke_lambda_async(
828 container_runtime: &Option<Arc<ContainerRuntime>>,
829 lambda_state: &Option<SharedLambdaState>,
830 function_arn: &str,
831 payload: &str,
832) {
833 let runtime = match container_runtime {
834 Some(rt) => rt.clone(),
835 None => return,
836 };
837 let lambda_state = match lambda_state {
838 Some(ls) => ls.clone(),
839 None => return,
840 };
841 let func_name = function_name_from_arn(function_arn).to_string();
842 let payload = payload.as_bytes().to_vec();
843
844 tokio::spawn(async move {
845 let resolved = {
846 let accounts = lambda_state.read();
847 let state = accounts.default_ref();
848 state.functions.get(&func_name).cloned().map(|func| {
849 let mut layer_zips: Vec<Vec<u8>> = Vec::with_capacity(func.layers.len());
850 for attached in &func.layers {
851 if let Some(bytes) = fakecloud_lambda::extras::parse_layer_version_arn(
852 &attached.arn,
853 )
854 .and_then(|(acct, name, ver)| {
855 accounts
856 .get(&acct)
857 .and_then(|s| s.layers.get(&name))
858 .and_then(|l| l.versions.iter().find(|v| v.version == ver))
859 .and_then(|v| v.code_zip.clone())
860 }) {
861 layer_zips.push(bytes);
862 }
863 }
864 (func, layer_zips)
865 })
866 };
867 let (func, layer_zips) = match resolved {
868 Some(pair) => pair,
869 None => {
870 tracing::warn!(
871 function = %func_name,
872 "EventBridge Lambda target not found, skipping invocation"
873 );
874 return;
875 }
876 };
877 match runtime.invoke(&func, &payload, &layer_zips).await {
878 Ok(_) => {
879 tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
880 }
881 Err(e) => {
882 tracing::warn!(
883 function = %func_name,
884 error = %e,
885 "EventBridge Lambda invocation failed"
886 );
887 }
888 }
889 });
890}
891
892pub(crate) fn deliver_to_logs(
895 logs_state: &SharedLogsState,
896 log_group_arn: &str,
897 payload: &str,
898 timestamp: chrono::DateTime<chrono::Utc>,
899) {
900 let group_name = if log_group_arn.contains(":log-group:") {
903 log_group_arn
904 .split(":log-group:")
905 .nth(1)
906 .unwrap_or(log_group_arn)
907 .trim_end_matches(":*")
908 } else {
909 log_group_arn
910 };
911
912 let stream_name = "events".to_string();
913 let ts_millis = timestamp.timestamp_millis();
914
915 let mut accounts = logs_state.write();
916 let state = accounts.default_mut();
917 let region = state.region.clone();
918 let account_id = state.account_id.clone();
919
920 let group = state
922 .log_groups
923 .entry(group_name.to_string())
924 .or_insert_with(|| fakecloud_logs::LogGroup {
925 name: group_name.to_string(),
926 arn: Arn::new(
927 "logs",
928 ®ion,
929 &account_id,
930 &format!("log-group:{group_name}"),
931 )
932 .to_string(),
933 creation_time: ts_millis,
934 retention_in_days: None,
935 kms_key_id: None,
936 tags: std::collections::BTreeMap::new(),
937 log_streams: std::collections::BTreeMap::new(),
938 stored_bytes: 0,
939 subscription_filters: Vec::new(),
940 data_protection_policy: None,
941 index_policies: Vec::new(),
942 transformer: None,
943 deletion_protection: false,
944 log_group_class: Some("STANDARD".to_string()),
945 });
946
947 let stream = group
948 .log_streams
949 .entry(stream_name.clone())
950 .or_insert_with(|| fakecloud_logs::LogStream {
951 name: stream_name,
952 arn: format!("{}:log-stream:events", group.arn),
953 creation_time: ts_millis,
954 first_event_timestamp: None,
955 last_event_timestamp: None,
956 last_ingestion_time: None,
957 upload_sequence_token: "1".to_string(),
958 events: Vec::new(),
959 });
960
961 stream.events.push(fakecloud_logs::LogEvent {
962 timestamp: ts_millis,
963 message: payload.to_string(),
964 ingestion_time: ts_millis,
965 });
966 stream.last_event_timestamp = Some(ts_millis);
967 stream.last_ingestion_time = Some(ts_millis);
968 if stream.first_event_timestamp.is_none() {
969 stream.first_event_timestamp = Some(ts_millis);
970 }
971}
972
973pub(crate) fn apply_connection_auth(
975 mut builder: reqwest::RequestBuilder,
976 conn: &Connection,
977) -> reqwest::RequestBuilder {
978 match conn.authorization_type.as_str() {
979 "API_KEY" => {
980 if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
981 if let (Some(name), Some(value)) = (
982 params["ApiKeyName"].as_str(),
983 params["ApiKeyValue"].as_str(),
984 ) {
985 builder = builder.header(name, value);
986 }
987 }
988 }
989 "BASIC" => {
990 if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
991 if let (Some(user), Some(pass)) =
992 (params["Username"].as_str(), params["Password"].as_str())
993 {
994 builder = builder.basic_auth(user, Some(pass));
995 }
996 }
997 }
998 "OAUTH_CLIENT_CREDENTIALS" => {
999 if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
1002 if let (Some(client_id), Some(client_secret)) = (
1003 params["ClientParameters"]["ClientID"].as_str(),
1004 params["ClientParameters"]["ClientSecret"].as_str(),
1005 ) {
1006 builder = builder.basic_auth(client_id, Some(client_secret));
1007 }
1008 }
1009 }
1010 _ => {}
1011 }
1012 builder
1013}
1014
1015pub(crate) struct EventDispatchContext<'a> {
1021 pub(crate) state: &'a crate::state::SharedEventBridgeState,
1022 pub(crate) delivery: &'a std::sync::Arc<fakecloud_core::delivery::DeliveryBus>,
1023 pub(crate) lambda_state: Option<&'a fakecloud_lambda::SharedLambdaState>,
1024 pub(crate) logs_state: Option<&'a fakecloud_logs::SharedLogsState>,
1025 pub(crate) container_runtime:
1026 &'a Option<std::sync::Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
1027 pub(crate) account_id: &'a str,
1028 pub(crate) region: &'a str,
1029}
1030
1031pub(crate) fn dispatch_event_target(
1036 ctx: &EventDispatchContext,
1037 target: &crate::state::EventTarget,
1038 event_json: &Value,
1039 event_id: &str,
1040 detail_type: &str,
1041) {
1042 let arn = &target.arn;
1043 let event_str = event_json.to_string();
1044 let body_str = if let Some(ref transformer) = target.input_transformer {
1045 apply_input_transformer(transformer, event_json)
1046 } else if let Some(ref input) = target.input {
1047 input.clone()
1048 } else if let Some(ref input_path) = target.input_path {
1049 resolve_json_path(event_json, input_path)
1050 .map(|v| v.to_string())
1051 .unwrap_or_else(|| event_str.clone())
1052 } else {
1053 event_str.clone()
1054 };
1055
1056 if arn.contains(":sqs:") {
1057 let group_id = target
1058 .sqs_parameters
1059 .as_ref()
1060 .and_then(|p| p["MessageGroupId"].as_str())
1061 .map(|s| s.to_string());
1062 if group_id.is_some() {
1063 ctx.delivery.send_to_sqs_with_attrs(
1064 arn,
1065 &body_str,
1066 &HashMap::new(),
1067 group_id.as_deref(),
1068 None,
1069 );
1070 } else {
1071 ctx.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
1072 }
1073 } else if arn.contains(":sns:") {
1074 ctx.delivery
1075 .publish_to_sns(arn, &body_str, Some(detail_type));
1076 } else if arn.contains(":lambda:") {
1077 tracing::info!(
1078 function_arn = %arn,
1079 payload = %body_str,
1080 "EventBridge delivering to Lambda function"
1081 );
1082 let now = chrono::Utc::now();
1083 {
1084 let mut accounts = ctx.state.write();
1085 let s = accounts.get_or_create(ctx.account_id);
1086 s.lambda_invocations.push(crate::state::LambdaInvocation {
1087 function_arn: arn.clone(),
1088 payload: body_str.clone(),
1089 timestamp: now,
1090 });
1091 }
1092 if let Some(ls) = ctx.lambda_state {
1093 ls.write()
1094 .default_mut()
1095 .invocations
1096 .push(fakecloud_lambda::LambdaInvocation {
1097 function_arn: arn.clone(),
1098 payload: body_str.clone(),
1099 timestamp: now,
1100 source: "aws:events".to_string(),
1101 });
1102 }
1103 invoke_lambda_async(
1104 ctx.container_runtime,
1105 &ctx.lambda_state.cloned(),
1106 arn,
1107 &body_str,
1108 );
1109 } else if arn.contains(":logs:") {
1110 tracing::info!(
1111 log_group_arn = %arn,
1112 payload = %body_str,
1113 "EventBridge delivering to CloudWatch Logs"
1114 );
1115 let now = chrono::Utc::now();
1116 {
1117 let mut accounts = ctx.state.write();
1118 let s = accounts.get_or_create(ctx.account_id);
1119 s.log_deliveries.push(crate::state::LogDelivery {
1120 log_group_arn: arn.clone(),
1121 payload: body_str.clone(),
1122 timestamp: now,
1123 });
1124 }
1125 if let Some(log_state) = ctx.logs_state {
1126 deliver_to_logs(log_state, arn, &body_str, now);
1127 }
1128 } else if arn.contains(":kinesis:") {
1129 tracing::info!(
1130 stream_arn = %arn,
1131 "EventBridge delivering to Kinesis stream"
1132 );
1133 ctx.delivery.send_to_kinesis(arn, &body_str, event_id);
1134 } else if arn.contains(":states:") {
1135 tracing::info!(
1136 state_machine_arn = %arn,
1137 "EventBridge delivering to Step Functions"
1138 );
1139 ctx.delivery.start_stepfunctions_execution(arn, &body_str);
1140 let mut accounts = ctx.state.write();
1141 let s = accounts.get_or_create(ctx.account_id);
1142 s.step_function_executions
1143 .push(crate::state::StepFunctionExecution {
1144 state_machine_arn: arn.clone(),
1145 payload: body_str.clone(),
1146 timestamp: chrono::Utc::now(),
1147 });
1148 } else if arn.contains(":api-destination/") {
1149 let accounts = ctx.state.read();
1150 let empty = crate::state::EventBridgeState::new(ctx.account_id, ctx.region);
1151 let s = accounts.get(ctx.account_id).unwrap_or(&empty);
1152 let dest = s.api_destinations.values().find(|d| d.arn == *arn).cloned();
1153 let conn = dest.as_ref().and_then(|d| {
1154 s.connections
1155 .values()
1156 .find(|c| c.arn == d.connection_arn)
1157 .cloned()
1158 });
1159 drop(accounts);
1160 if let Some(dest) = dest {
1161 let url = dest.invocation_endpoint;
1162 let method = dest.http_method;
1163 let payload = body_str.clone();
1164 tokio::spawn(async move {
1165 let client = reqwest::Client::new();
1166 let mut req_builder = match method.as_str() {
1167 "GET" => client.get(&url),
1168 "PUT" => client.put(&url),
1169 "DELETE" => client.delete(&url),
1170 "PATCH" => client.patch(&url),
1171 "HEAD" => client.head(&url),
1172 _ => client.post(&url),
1173 };
1174 req_builder = req_builder.header("Content-Type", "application/json");
1175 if let Some(conn) = conn {
1176 req_builder = apply_connection_auth(req_builder, &conn);
1177 }
1178 let result = req_builder.body(payload).send().await;
1179 if let Err(e) = result {
1180 tracing::warn!(
1181 endpoint = %url,
1182 error = %e,
1183 "EventBridge ApiDestination delivery failed"
1184 );
1185 }
1186 });
1187 }
1188 } else if arn.starts_with("https://") || arn.starts_with("http://") {
1189 let url = arn.clone();
1190 let payload = body_str.clone();
1191 tokio::spawn(async move {
1192 let client = reqwest::Client::new();
1193 let result = client
1194 .post(&url)
1195 .header("Content-Type", "application/json")
1196 .body(payload)
1197 .send()
1198 .await;
1199 if let Err(e) = result {
1200 tracing::warn!(
1201 endpoint = %url,
1202 error = %e,
1203 "EventBridge HTTP target delivery failed"
1204 );
1205 }
1206 });
1207 }
1208}