1use super::*;
2
3pub(crate) fn validate_put_events_entry(
8 source: &str,
9 detail_type: &str,
10 detail: &str,
11) -> Result<(), Value> {
12 if source.is_empty() {
13 return Err(json!({
14 "ErrorCode": "InvalidArgument",
15 "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
16 }));
17 }
18 if detail_type.is_empty() {
19 return Err(json!({
20 "ErrorCode": "InvalidArgument",
21 "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
22 }));
23 }
24 if detail.is_empty() {
25 return Err(json!({
26 "ErrorCode": "InvalidArgument",
27 "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
28 }));
29 }
30 if serde_json::from_str::<Value>(detail).is_err() {
31 return Err(json!({
32 "ErrorCode": "MalformedDetail",
33 "ErrorMessage": "Detail is malformed.",
34 }));
35 }
36 Ok(())
37}
38
39pub(crate) fn parse_put_events_time(raw: &Value) -> DateTime<Utc> {
44 if let Some(s) = raw.as_str() {
45 return DateTime::parse_from_rfc3339(s)
46 .map(|dt| dt.with_timezone(&Utc))
47 .unwrap_or_else(|_| Utc::now());
48 }
49 if let Some(ts) = raw.as_f64() {
50 return DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
51 .unwrap_or_else(Utc::now);
52 }
53 if let Some(ts) = raw.as_i64() {
54 return DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now);
55 }
56 Utc::now()
57}
58
59pub(crate) fn is_mutating_action(action: &str) -> bool {
61 matches!(
62 action,
63 "CreateEventBus"
64 | "DeleteEventBus"
65 | "UpdateEventBus"
66 | "PutRule"
67 | "DeleteRule"
68 | "EnableRule"
69 | "DisableRule"
70 | "PutTargets"
71 | "RemoveTargets"
72 | "PutEvents"
73 | "PutPermission"
74 | "RemovePermission"
75 | "TagResource"
76 | "UntagResource"
77 | "CreateArchive"
78 | "UpdateArchive"
79 | "DeleteArchive"
80 | "CreateConnection"
81 | "UpdateConnection"
82 | "DeleteConnection"
83 | "DeauthorizeConnection"
84 | "CreateApiDestination"
85 | "UpdateApiDestination"
86 | "DeleteApiDestination"
87 | "StartReplay"
88 | "CancelReplay"
89 | "CreatePartnerEventSource"
90 | "DeletePartnerEventSource"
91 | "ActivateEventSource"
92 | "DeactivateEventSource"
93 | "PutPartnerEvents"
94 | "CreateEndpoint"
95 | "DeleteEndpoint"
96 | "UpdateEndpoint"
97 )
98}
99
100pub(crate) fn parse_tags(body: &Value) -> BTreeMap<String, String> {
101 let mut tags = BTreeMap::new();
102 if let Some(arr) = body["Tags"].as_array() {
103 for tag in arr {
104 if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
105 tags.insert(key.to_string(), val.to_string());
106 }
107 }
108 }
109 tags
110}
111
112pub fn parse_target(target: &Value) -> EventTarget {
113 EventTarget {
114 id: target["Id"].as_str().unwrap_or("").to_string(),
115 arn: target["Arn"].as_str().unwrap_or("").to_string(),
116 input: target["Input"].as_str().map(|s| s.to_string()),
117 input_path: target["InputPath"].as_str().map(|s| s.to_string()),
118 input_transformer: target.get("InputTransformer").cloned(),
119 sqs_parameters: target.get("SqsParameters").cloned(),
120 role_arn: target["RoleArn"].as_str().map(|s| s.to_string()),
121 dead_letter_config: target.get("DeadLetterConfig").cloned(),
122 retry_policy: target.get("RetryPolicy").cloned(),
123 ecs_parameters: target.get("EcsParameters").cloned(),
124 batch_parameters: target.get("BatchParameters").cloned(),
125 kinesis_parameters: target.get("KinesisParameters").cloned(),
126 redshift_data_parameters: target.get("RedshiftDataParameters").cloned(),
127 http_parameters: target.get("HttpParameters").cloned(),
128 sage_maker_pipeline_parameters: target.get("SageMakerPipelineParameters").cloned(),
129 app_sync_parameters: target.get("AppSyncParameters").cloned(),
130 run_command_parameters: target.get("RunCommandParameters").cloned(),
131 }
132}
133
134pub(crate) fn target_to_json(t: &EventTarget) -> Value {
135 let mut obj = json!({ "Id": t.id, "Arn": t.arn });
136 if let Some(ref input) = t.input {
137 obj["Input"] = json!(input);
138 }
139 if let Some(ref input_path) = t.input_path {
140 obj["InputPath"] = json!(input_path);
141 }
142 if let Some(ref it) = t.input_transformer {
143 obj["InputTransformer"] = it.clone();
144 }
145 if let Some(ref sp) = t.sqs_parameters {
146 obj["SqsParameters"] = sp.clone();
147 }
148 if let Some(ref ra) = t.role_arn {
149 obj["RoleArn"] = json!(ra);
150 }
151 if let Some(ref dlc) = t.dead_letter_config {
152 obj["DeadLetterConfig"] = dlc.clone();
153 }
154 if let Some(ref rp) = t.retry_policy {
155 obj["RetryPolicy"] = rp.clone();
156 }
157 if let Some(ref p) = t.ecs_parameters {
158 obj["EcsParameters"] = p.clone();
159 }
160 if let Some(ref p) = t.batch_parameters {
161 obj["BatchParameters"] = p.clone();
162 }
163 if let Some(ref p) = t.kinesis_parameters {
164 obj["KinesisParameters"] = p.clone();
165 }
166 if let Some(ref p) = t.redshift_data_parameters {
167 obj["RedshiftDataParameters"] = p.clone();
168 }
169 if let Some(ref p) = t.http_parameters {
170 obj["HttpParameters"] = p.clone();
171 }
172 if let Some(ref p) = t.sage_maker_pipeline_parameters {
173 obj["SageMakerPipelineParameters"] = p.clone();
174 }
175 if let Some(ref p) = t.app_sync_parameters {
176 obj["AppSyncParameters"] = p.clone();
177 }
178 if let Some(ref p) = t.run_command_parameters {
179 obj["RunCommandParameters"] = p.clone();
180 }
181 obj
182}
183
184pub(crate) fn find_tags_mut<'a>(
185 state: &'a mut crate::state::EventBridgeState,
186 arn: &str,
187) -> Result<&'a mut BTreeMap<String, String>, AwsServiceError> {
188 for bus in state.buses.values_mut() {
190 if bus.arn == arn {
191 return Ok(&mut bus.tags);
192 }
193 }
194 for rule in state.rules.values_mut() {
196 if rule.arn == arn {
197 return Ok(&mut rule.tags);
198 }
199 }
200
201 let error_msg = if arn.contains(":rule/") {
203 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
205 if let Some(rule_path) = parts.first() {
206 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
207 format!("Rule {rule_name} does not exist on EventBus {bus}.")
208 } else {
209 format!("Rule {} does not exist on EventBus default.", rule_path)
210 }
211 } else {
212 format!("Resource {arn} not found.")
213 }
214 } else {
215 format!("Resource {arn} not found.")
216 };
217
218 Err(AwsServiceError::aws_error(
219 StatusCode::BAD_REQUEST,
220 "ResourceNotFoundException",
221 error_msg,
222 ))
223}
224
225pub(crate) fn find_tags<'a>(
226 state: &'a crate::state::EventBridgeState,
227 arn: &str,
228) -> Result<&'a BTreeMap<String, String>, AwsServiceError> {
229 for bus in state.buses.values() {
230 if bus.arn == arn {
231 return Ok(&bus.tags);
232 }
233 }
234 for rule in state.rules.values() {
235 if rule.arn == arn {
236 return Ok(&rule.tags);
237 }
238 }
239
240 let error_msg = if arn.contains(":rule/") {
241 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
242 if let Some(rule_path) = parts.first() {
243 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
244 format!("Rule {rule_name} does not exist on EventBus {bus}.")
245 } else {
246 format!("Rule {} does not exist on EventBus default.", rule_path)
247 }
248 } else {
249 format!("Resource {arn} not found.")
250 }
251 } else {
252 format!("Resource {arn} not found.")
253 };
254
255 Err(AwsServiceError::aws_error(
256 StatusCode::BAD_REQUEST,
257 "ResourceNotFoundException",
258 error_msg,
259 ))
260}
261
262pub(crate) fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
263 let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
264 AwsServiceError::aws_error(
265 StatusCode::BAD_REQUEST,
266 "InvalidEventPatternException",
267 "Event pattern is not valid. Reason: Invalid JSON",
268 )
269 })?;
270
271 validate_pattern_values(&parsed, "")?;
272 Ok(())
273}
274
275pub(crate) fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
276 match value {
277 Value::Object(obj) => {
278 for (key, val) in obj {
279 let new_path = if path.is_empty() {
280 key.clone()
281 } else {
282 format!("{path}.{key}")
283 };
284 match val {
285 Value::Object(_) => validate_pattern_values(val, &new_path)?,
286 Value::Array(_) => {} _ => {
288 return Err(AwsServiceError::aws_error(
289 StatusCode::BAD_REQUEST,
290 "InvalidEventPatternException",
291 format!(
292 "Event pattern is not valid. Reason: '{}' must be an object or an array",
293 key
294 ),
295 ));
296 }
297 }
298 }
299 Ok(())
300 }
301 _ => Ok(()),
302 }
303}
304
305pub(crate) fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
306 match auth_type {
307 "API_KEY" => {
308 let mut resp = json!({});
309 if let Some(api_key) = params.get("ApiKeyAuthParameters") {
310 resp["ApiKeyAuthParameters"] = json!({
311 "ApiKeyName": api_key["ApiKeyName"],
312 });
313 }
314 resp
315 }
316 "BASIC" => {
317 let mut resp = json!({});
318 if let Some(basic) = params.get("BasicAuthParameters") {
319 resp["BasicAuthParameters"] = json!({
320 "Username": basic["Username"],
321 });
322 }
323 resp
324 }
325 "OAUTH_CLIENT_CREDENTIALS" => {
326 let mut resp = json!({});
327 if let Some(oauth) = params.get("OAuthParameters") {
328 resp["OAuthParameters"] = json!({
329 "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
330 "HttpMethod": oauth["HttpMethod"],
331 "ClientParameters": {
332 "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
333 },
334 });
335 }
336 resp
337 }
338 _ => params.clone(),
339 }
340}
341
342pub(crate) fn matches_pattern(
344 pattern_json: Option<&str>,
345 source: &str,
346 detail_type: &str,
347 detail: &str,
348 account: &str,
349 region: &str,
350 resources: &[String],
351) -> bool {
352 let pattern_json = match pattern_json {
353 Some(p) => p,
354 None => return true,
355 };
356
357 let pattern: Value = match serde_json::from_str(pattern_json) {
358 Ok(v) => v,
359 Err(_) => return false,
360 };
361
362 if !pattern.is_object() {
363 return false;
364 }
365
366 let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
367 let event = json!({
368 "source": source,
369 "detail-type": detail_type,
370 "detail": detail_value,
371 "account": account,
372 "region": region,
373 "resources": resources,
374 });
375
376 matches_value(&pattern, &event)
377}
378
379pub(crate) fn matches_value(pattern: &Value, event_value: &Value) -> bool {
380 match pattern {
381 Value::Object(obj) => {
382 if let Some(Value::Array(alternatives)) = obj.get("$or") {
385 return alternatives
386 .iter()
387 .any(|alt| matches_value(alt, event_value));
388 }
389 for (key, sub_pattern) in obj {
390 if key == "$or" {
391 continue;
392 }
393 let sub_value = &event_value[key];
394 if !matches_value(sub_pattern, sub_value) {
395 return false;
396 }
397 }
398 true
399 }
400 Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
401 _ => false,
402 }
403}
404
405pub(crate) fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
406 match pattern_elem {
407 Value::Object(obj) => {
408 if let Some(prefix_val) = obj.get("prefix") {
409 if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
410 return actual.starts_with(prefix);
411 }
412 return false;
413 }
414 if let Some(suffix_val) = obj.get("suffix") {
415 if let (Some(suffix), Some(actual)) = (suffix_val.as_str(), event_value.as_str()) {
416 return actual.ends_with(suffix);
417 }
418 return false;
419 }
420 if let Some(eqic_val) = obj.get("equals-ignore-case") {
421 if let (Some(expected), Some(actual)) = (eqic_val.as_str(), event_value.as_str()) {
422 return expected.eq_ignore_ascii_case(actual);
423 }
424 return false;
425 }
426 if let Some(cidr_val) = obj.get("cidr") {
427 if let (Some(cidr), Some(actual)) = (cidr_val.as_str(), event_value.as_str()) {
428 return cidr_matches(cidr, actual);
429 }
430 return false;
431 }
432 if let Some(wild_val) = obj.get("wildcard") {
433 if let (Some(pattern), Some(actual)) = (wild_val.as_str(), event_value.as_str()) {
434 return wildcard_matches(pattern, actual);
435 }
436 return false;
437 }
438 if let Some(exists_val) = obj.get("exists") {
439 let should_exist = exists_val.as_bool().unwrap_or(true);
440 let does_exist = !event_value.is_null();
441 return should_exist == does_exist;
442 }
443 if let Some(anything_but_val) = obj.get("anything-but") {
444 return match anything_but_val {
445 Value::String(s) => event_value.as_str() != Some(s.as_str()),
446 Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
447 Value::Number(_) => event_value != anything_but_val,
448 Value::Object(nested) => {
454 let fv = event_value.as_str();
455 if let Some(p) = nested.get("prefix").and_then(|v| v.as_str()) {
456 !fv.is_some_and(|s| s.starts_with(p))
457 } else if let Some(suf) = nested.get("suffix").and_then(|v| v.as_str()) {
458 !fv.is_some_and(|s| s.ends_with(suf))
459 } else if let Some(eic) =
460 nested.get("equals-ignore-case").and_then(|v| v.as_str())
461 {
462 !fv.is_some_and(|s| s.eq_ignore_ascii_case(eic))
463 } else {
464 true
466 }
467 }
468 _ => true,
469 };
470 }
471 if let Some(numeric_val) = obj.get("numeric") {
472 return matches_numeric(numeric_val, event_value);
473 }
474 false
475 }
476 _ => values_equal(pattern_elem, event_value),
477 }
478}
479
480pub(crate) fn wildcard_matches(pattern: &str, actual: &str) -> bool {
483 let mut segments: Vec<String> = Vec::new();
484 let mut current = String::new();
485 let mut chars = pattern.chars();
486 while let Some(c) = chars.next() {
487 if c == '\\' {
488 if let Some(next) = chars.next() {
489 current.push(next);
490 }
491 } else if c == '*' {
492 segments.push(std::mem::take(&mut current));
493 } else {
494 current.push(c);
495 }
496 }
497 segments.push(current);
498
499 if segments.len() == 1 {
500 return segments[0] == actual;
501 }
502
503 let mut pos = 0;
504 let first = &segments[0];
505 if !actual[pos..].starts_with(first.as_str()) {
506 return false;
507 }
508 pos += first.len();
509
510 let last_idx = segments.len() - 1;
511 for (i, seg) in segments.iter().enumerate().skip(1) {
512 if i == last_idx {
513 if !actual[pos..].ends_with(seg.as_str()) {
515 return false;
516 }
517 return actual.len().saturating_sub(pos) >= seg.len();
518 }
519 match actual[pos..].find(seg.as_str()) {
520 Some(idx) => pos += idx + seg.len(),
521 None => return false,
522 }
523 }
524 true
525}
526
527pub(crate) fn cidr_matches(cidr: &str, actual: &str) -> bool {
529 let (net_str, prefix_str) = match cidr.split_once('/') {
530 Some(parts) => parts,
531 None => return false,
532 };
533 let prefix: u32 = match prefix_str.parse() {
534 Ok(p) if p <= 32 => p,
535 _ => return false,
536 };
537 let net = match parse_ipv4(net_str) {
538 Some(n) => n,
539 None => return false,
540 };
541 let value = match parse_ipv4(actual) {
542 Some(v) => v,
543 None => return false,
544 };
545 if prefix == 0 {
546 return true;
547 }
548 let mask = u32::MAX << (32 - prefix);
549 (net & mask) == (value & mask)
550}
551
552fn parse_ipv4(s: &str) -> Option<u32> {
553 let mut parts = s.split('.');
554 let mut result: u32 = 0;
555 for _ in 0..4 {
556 let octet: u32 = parts.next()?.parse().ok()?;
557 if octet > 255 {
558 return None;
559 }
560 result = (result << 8) | octet;
561 }
562 if parts.next().is_some() {
563 return None;
564 }
565 Some(result)
566}
567
568#[allow(clippy::too_many_arguments)]
572pub(crate) fn archive_matching_event(
573 state: &mut crate::state::EventBridgeState,
574 event: &PutEvent,
575 event_bus_name: &str,
576 source: &str,
577 detail_type: &str,
578 detail: &str,
579 account_id: &str,
580 region: &str,
581 resources: &[String],
582) {
583 let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
584 for akey in archive_keys {
585 let (archive_bus, archive_pattern, archive_enabled) = {
586 let a = &state.archives[&akey];
587 (
588 state.resolve_bus_name(&a.event_source_arn),
589 a.event_pattern.clone(),
590 a.state == "ENABLED",
591 )
592 };
593 if archive_bus != event_bus_name || !archive_enabled {
594 continue;
595 }
596 let pattern_matches = matches_pattern(
597 archive_pattern.as_deref(),
598 source,
599 detail_type,
600 detail,
601 account_id,
602 region,
603 resources,
604 );
605 if !pattern_matches {
606 continue;
607 }
608 if let Some(archive) = state.archives.get_mut(&akey) {
609 archive.event_count += 1;
610 archive.size_bytes += detail.len() as i64;
611 archive.events.push(event.clone());
612 }
613 }
614}
615
616#[allow(clippy::too_many_arguments)]
621pub(crate) fn collect_replay_events_with_targets(
622 state: &crate::state::EventBridgeState,
623 archive_name: &str,
624 bus_name: &str,
625 event_start_time: DateTime<Utc>,
626 event_end_time: DateTime<Utc>,
627 account_id: &str,
628 region: &str,
629) -> Vec<(PutEvent, Vec<EventTarget>)> {
630 let Some(archive) = state.archives.get(archive_name) else {
631 return Vec::new();
632 };
633
634 let replay_events: Vec<PutEvent> = archive
635 .events
636 .iter()
637 .filter(|e| e.time >= event_start_time && e.time < event_end_time)
638 .cloned()
639 .collect();
640
641 let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
642 for event in replay_events {
643 let matching_targets: Vec<EventTarget> = state
644 .rules
645 .values()
646 .filter(|r| {
647 r.event_bus_name == bus_name
648 && r.state == "ENABLED"
649 && matches_pattern(
650 r.event_pattern.as_deref(),
651 &event.source,
652 &event.detail_type,
653 &event.detail,
654 account_id,
655 region,
656 &event.resources,
657 )
658 })
659 .flat_map(|r| r.targets.clone())
660 .collect();
661
662 if !matching_targets.is_empty() {
663 events_to_deliver.push((event, matching_targets));
664 }
665 }
666 events_to_deliver
667}
668
669pub(crate) fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
670 let arr = match numeric_arr.as_array() {
671 Some(a) => a,
672 None => return false,
673 };
674 let actual = match event_value.as_f64() {
675 Some(n) => n,
676 None => return false,
677 };
678 let mut i = 0;
679 while i + 1 < arr.len() {
680 let op = match arr[i].as_str() {
681 Some(s) => s,
682 None => return false,
683 };
684 let threshold = match arr[i + 1].as_f64() {
685 Some(n) => n,
686 None => return false,
687 };
688 let ok = match op {
689 ">" => actual > threshold,
690 ">=" => actual >= threshold,
691 "<" => actual < threshold,
692 "<=" => actual <= threshold,
693 "=" => (actual - threshold).abs() < f64::EPSILON,
694 _ => return false,
695 };
696 if !ok {
697 return false;
698 }
699 i += 2;
700 }
701 true
702}
703
704pub(crate) fn values_equal(a: &Value, b: &Value) -> bool {
705 a == b
706}
707
708pub(crate) fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
710 let path = path.strip_prefix('$').unwrap_or(path);
711 let mut current = event;
712 for segment in path.split('.') {
713 if segment.is_empty() {
714 continue;
715 }
716 current = current.get(segment)?;
717 }
718 Some(current.clone())
719}
720
721pub(crate) fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
723 let input_paths_map = transformer
724 .get("InputPathsMap")
725 .and_then(|v| v.as_object())
726 .cloned()
727 .unwrap_or_default();
728 let template = transformer
729 .get("InputTemplate")
730 .and_then(|v| v.as_str())
731 .unwrap_or("")
732 .to_string();
733
734 let mut resolved: HashMap<String, Value> = HashMap::new();
736 for (var_name, path_val) in &input_paths_map {
737 if let Some(path_str) = path_val.as_str() {
738 if let Some(val) = resolve_json_path(event, path_str) {
739 resolved.insert(var_name.clone(), val);
740 }
741 }
742 }
743
744 let mut result = template;
746 for (var_name, val) in &resolved {
747 let placeholder = format!("<{var_name}>");
748 let replacement = match val {
749 Value::String(s) => s.clone(),
750 other => other.to_string(),
751 };
752 result = result.replace(&placeholder, &replacement);
753 }
754
755 result
756}
757
758pub(crate) fn missing(name: &str) -> AwsServiceError {
759 AwsServiceError::aws_error(
760 StatusCode::BAD_REQUEST,
761 "ValidationException",
762 format!("The request must contain the parameter {name}"),
763 )
764}
765
766pub(crate) fn function_name_from_arn(arn: &str) -> &str {
771 let parts: Vec<&str> = arn.split(':').collect();
772 if parts.len() >= 7 && parts[5] == "function" {
773 parts[6]
774 } else {
775 arn
776 }
777}
778
779pub(crate) fn invoke_lambda_async(
782 container_runtime: &Option<Arc<ContainerRuntime>>,
783 lambda_state: &Option<SharedLambdaState>,
784 function_arn: &str,
785 payload: &str,
786) {
787 let runtime = match container_runtime {
788 Some(rt) => rt.clone(),
789 None => return,
790 };
791 let lambda_state = match lambda_state {
792 Some(ls) => ls.clone(),
793 None => return,
794 };
795 let func_name = function_name_from_arn(function_arn).to_string();
796 let payload = payload.as_bytes().to_vec();
797
798 tokio::spawn(async move {
799 let resolved = {
800 let accounts = lambda_state.read();
801 let state = accounts.default_ref();
802 state.functions.get(&func_name).cloned().map(|func| {
803 let mut layer_zips: Vec<Vec<u8>> = Vec::with_capacity(func.layers.len());
804 for attached in &func.layers {
805 if let Some(bytes) = fakecloud_lambda::extras::parse_layer_version_arn(
806 &attached.arn,
807 )
808 .and_then(|(acct, name, ver)| {
809 accounts
810 .get(&acct)
811 .and_then(|s| s.layers.get(&name))
812 .and_then(|l| l.versions.iter().find(|v| v.version == ver))
813 .and_then(|v| v.code_zip.clone())
814 }) {
815 layer_zips.push(bytes);
816 }
817 }
818 (func, layer_zips)
819 })
820 };
821 let (func, layer_zips) = match resolved {
822 Some(pair) => pair,
823 None => {
824 tracing::warn!(
825 function = %func_name,
826 "EventBridge Lambda target not found, skipping invocation"
827 );
828 return;
829 }
830 };
831 match runtime.invoke(&func, &payload, &layer_zips).await {
832 Ok(_) => {
833 tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
834 }
835 Err(e) => {
836 tracing::warn!(
837 function = %func_name,
838 error = %e,
839 "EventBridge Lambda invocation failed"
840 );
841 }
842 }
843 });
844}
845
846pub(crate) fn deliver_to_logs(
849 logs_state: &SharedLogsState,
850 log_group_arn: &str,
851 payload: &str,
852 timestamp: chrono::DateTime<chrono::Utc>,
853) {
854 let group_name = if log_group_arn.contains(":log-group:") {
857 log_group_arn
858 .split(":log-group:")
859 .nth(1)
860 .unwrap_or(log_group_arn)
861 .trim_end_matches(":*")
862 } else {
863 log_group_arn
864 };
865
866 let stream_name = "events".to_string();
867 let ts_millis = timestamp.timestamp_millis();
868
869 let mut accounts = logs_state.write();
870 let state = accounts.default_mut();
871 let region = state.region.clone();
872 let account_id = state.account_id.clone();
873
874 let group = state
876 .log_groups
877 .entry(group_name.to_string())
878 .or_insert_with(|| fakecloud_logs::LogGroup {
879 name: group_name.to_string(),
880 arn: Arn::new(
881 "logs",
882 ®ion,
883 &account_id,
884 &format!("log-group:{group_name}"),
885 )
886 .to_string(),
887 creation_time: ts_millis,
888 retention_in_days: None,
889 kms_key_id: None,
890 tags: std::collections::BTreeMap::new(),
891 log_streams: std::collections::BTreeMap::new(),
892 stored_bytes: 0,
893 subscription_filters: Vec::new(),
894 data_protection_policy: None,
895 index_policies: Vec::new(),
896 transformer: None,
897 deletion_protection: false,
898 log_group_class: Some("STANDARD".to_string()),
899 });
900
901 let stream = group
902 .log_streams
903 .entry(stream_name.clone())
904 .or_insert_with(|| fakecloud_logs::LogStream {
905 name: stream_name,
906 arn: format!("{}:log-stream:events", group.arn),
907 creation_time: ts_millis,
908 first_event_timestamp: None,
909 last_event_timestamp: None,
910 last_ingestion_time: None,
911 upload_sequence_token: "1".to_string(),
912 events: Vec::new(),
913 });
914
915 stream.events.push(fakecloud_logs::LogEvent {
916 timestamp: ts_millis,
917 message: payload.to_string(),
918 ingestion_time: ts_millis,
919 });
920 stream.last_event_timestamp = Some(ts_millis);
921 stream.last_ingestion_time = Some(ts_millis);
922 if stream.first_event_timestamp.is_none() {
923 stream.first_event_timestamp = Some(ts_millis);
924 }
925}
926
927pub(crate) fn apply_connection_auth(
929 mut builder: reqwest::RequestBuilder,
930 conn: &Connection,
931) -> reqwest::RequestBuilder {
932 match conn.authorization_type.as_str() {
933 "API_KEY" => {
934 if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
935 if let (Some(name), Some(value)) = (
936 params["ApiKeyName"].as_str(),
937 params["ApiKeyValue"].as_str(),
938 ) {
939 builder = builder.header(name, value);
940 }
941 }
942 }
943 "BASIC" => {
944 if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
945 if let (Some(user), Some(pass)) =
946 (params["Username"].as_str(), params["Password"].as_str())
947 {
948 builder = builder.basic_auth(user, Some(pass));
949 }
950 }
951 }
952 "OAUTH_CLIENT_CREDENTIALS" => {
953 if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
956 if let (Some(client_id), Some(client_secret)) = (
957 params["ClientParameters"]["ClientID"].as_str(),
958 params["ClientParameters"]["ClientSecret"].as_str(),
959 ) {
960 builder = builder.basic_auth(client_id, Some(client_secret));
961 }
962 }
963 }
964 _ => {}
965 }
966 builder
967}
968
969pub(crate) struct EventDispatchContext<'a> {
975 pub(crate) state: &'a crate::state::SharedEventBridgeState,
976 pub(crate) delivery: &'a std::sync::Arc<fakecloud_core::delivery::DeliveryBus>,
977 pub(crate) lambda_state: Option<&'a fakecloud_lambda::SharedLambdaState>,
978 pub(crate) logs_state: Option<&'a fakecloud_logs::SharedLogsState>,
979 pub(crate) container_runtime:
980 &'a Option<std::sync::Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
981 pub(crate) account_id: &'a str,
982 pub(crate) region: &'a str,
983}
984
985pub(crate) fn dispatch_event_target(
990 ctx: &EventDispatchContext,
991 target: &crate::state::EventTarget,
992 event_json: &Value,
993 event_id: &str,
994 detail_type: &str,
995) {
996 let arn = &target.arn;
997 let event_str = event_json.to_string();
998 let body_str = if let Some(ref transformer) = target.input_transformer {
999 apply_input_transformer(transformer, event_json)
1000 } else if let Some(ref input) = target.input {
1001 input.clone()
1002 } else if let Some(ref input_path) = target.input_path {
1003 resolve_json_path(event_json, input_path)
1004 .map(|v| v.to_string())
1005 .unwrap_or_else(|| event_str.clone())
1006 } else {
1007 event_str.clone()
1008 };
1009
1010 if arn.contains(":sqs:") {
1011 let group_id = target
1012 .sqs_parameters
1013 .as_ref()
1014 .and_then(|p| p["MessageGroupId"].as_str())
1015 .map(|s| s.to_string());
1016 if group_id.is_some() {
1017 ctx.delivery.send_to_sqs_with_attrs(
1018 arn,
1019 &body_str,
1020 &HashMap::new(),
1021 group_id.as_deref(),
1022 None,
1023 );
1024 } else {
1025 ctx.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
1026 }
1027 } else if arn.contains(":sns:") {
1028 ctx.delivery
1029 .publish_to_sns(arn, &body_str, Some(detail_type));
1030 } else if arn.contains(":lambda:") {
1031 tracing::info!(
1032 function_arn = %arn,
1033 payload = %body_str,
1034 "EventBridge delivering to Lambda function"
1035 );
1036 let now = chrono::Utc::now();
1037 {
1038 let mut accounts = ctx.state.write();
1039 let s = accounts.get_or_create(ctx.account_id);
1040 s.lambda_invocations.push(crate::state::LambdaInvocation {
1041 function_arn: arn.clone(),
1042 payload: body_str.clone(),
1043 timestamp: now,
1044 });
1045 }
1046 if let Some(ls) = ctx.lambda_state {
1047 ls.write()
1048 .default_mut()
1049 .invocations
1050 .push(fakecloud_lambda::LambdaInvocation {
1051 function_arn: arn.clone(),
1052 payload: body_str.clone(),
1053 timestamp: now,
1054 source: "aws:events".to_string(),
1055 });
1056 }
1057 invoke_lambda_async(
1058 ctx.container_runtime,
1059 &ctx.lambda_state.cloned(),
1060 arn,
1061 &body_str,
1062 );
1063 } else if arn.contains(":logs:") {
1064 tracing::info!(
1065 log_group_arn = %arn,
1066 payload = %body_str,
1067 "EventBridge delivering to CloudWatch Logs"
1068 );
1069 let now = chrono::Utc::now();
1070 {
1071 let mut accounts = ctx.state.write();
1072 let s = accounts.get_or_create(ctx.account_id);
1073 s.log_deliveries.push(crate::state::LogDelivery {
1074 log_group_arn: arn.clone(),
1075 payload: body_str.clone(),
1076 timestamp: now,
1077 });
1078 }
1079 if let Some(log_state) = ctx.logs_state {
1080 deliver_to_logs(log_state, arn, &body_str, now);
1081 }
1082 } else if arn.contains(":kinesis:") {
1083 tracing::info!(
1084 stream_arn = %arn,
1085 "EventBridge delivering to Kinesis stream"
1086 );
1087 ctx.delivery.send_to_kinesis(arn, &body_str, event_id);
1088 } else if arn.contains(":states:") {
1089 tracing::info!(
1090 state_machine_arn = %arn,
1091 "EventBridge delivering to Step Functions"
1092 );
1093 ctx.delivery.start_stepfunctions_execution(arn, &body_str);
1094 let mut accounts = ctx.state.write();
1095 let s = accounts.get_or_create(ctx.account_id);
1096 s.step_function_executions
1097 .push(crate::state::StepFunctionExecution {
1098 state_machine_arn: arn.clone(),
1099 payload: body_str.clone(),
1100 timestamp: chrono::Utc::now(),
1101 });
1102 } else if arn.contains(":api-destination/") {
1103 let accounts = ctx.state.read();
1104 let empty = crate::state::EventBridgeState::new(ctx.account_id, ctx.region);
1105 let s = accounts.get(ctx.account_id).unwrap_or(&empty);
1106 let dest = s.api_destinations.values().find(|d| d.arn == *arn).cloned();
1107 let conn = dest.as_ref().and_then(|d| {
1108 s.connections
1109 .values()
1110 .find(|c| c.arn == d.connection_arn)
1111 .cloned()
1112 });
1113 drop(accounts);
1114 if let Some(dest) = dest {
1115 let url = dest.invocation_endpoint;
1116 let method = dest.http_method;
1117 let payload = body_str.clone();
1118 tokio::spawn(async move {
1119 let client = reqwest::Client::new();
1120 let mut req_builder = match method.as_str() {
1121 "GET" => client.get(&url),
1122 "PUT" => client.put(&url),
1123 "DELETE" => client.delete(&url),
1124 "PATCH" => client.patch(&url),
1125 "HEAD" => client.head(&url),
1126 _ => client.post(&url),
1127 };
1128 req_builder = req_builder.header("Content-Type", "application/json");
1129 if let Some(conn) = conn {
1130 req_builder = apply_connection_auth(req_builder, &conn);
1131 }
1132 let result = req_builder.body(payload).send().await;
1133 if let Err(e) = result {
1134 tracing::warn!(
1135 endpoint = %url,
1136 error = %e,
1137 "EventBridge ApiDestination delivery failed"
1138 );
1139 }
1140 });
1141 }
1142 } else if arn.starts_with("https://") || arn.starts_with("http://") {
1143 let url = arn.clone();
1144 let payload = body_str.clone();
1145 tokio::spawn(async move {
1146 let client = reqwest::Client::new();
1147 let result = client
1148 .post(&url)
1149 .header("Content-Type", "application/json")
1150 .body(payload)
1151 .send()
1152 .await;
1153 if let Err(e) = result {
1154 tracing::warn!(
1155 endpoint = %url,
1156 error = %e,
1157 "EventBridge HTTP target delivery failed"
1158 );
1159 }
1160 });
1161 }
1162}