fakecloud-eventbridge 0.19.0

EventBridge implementation for FakeCloud
Documentation
// Auto-extracted from service.rs as part of carryover service.rs split.

#![allow(clippy::too_many_arguments)]

use chrono::Utc;
use http::StatusCode;
use serde_json::{json, Value};

use fakecloud_core::pagination::paginate;
use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};

use super::*;

impl EventBridgeService {
    pub(super) fn create_partner_event_source(
        &self,
        req: &AwsRequest,
    ) -> Result<AwsResponse, AwsServiceError> {
        let body = req.json_body();
        validate_required("Name", &body["Name"])?;
        let name = body["Name"]
            .as_str()
            .ok_or_else(|| missing("Name"))?
            .to_string();
        validate_string_length("name", &name, 1, 256)?;
        validate_required("Account", &body["Account"])?;
        let account = body["Account"]
            .as_str()
            .ok_or_else(|| missing("Account"))?
            .to_string();
        validate_string_length("account", &account, 12, 12)?;

        let mut accounts = self.state.write();
        let state = accounts.get_or_create(&req.account_id);
        if state.partner_event_sources.contains_key(&name) {
            return Err(AwsServiceError::aws_error(
                StatusCode::CONFLICT,
                "ResourceAlreadyExistsException",
                format!("Partner event source {name} already exists."),
            ));
        }
        let arn = format!(
            "arn:aws:events:{}::event-source/aws.partner/{}",
            state.region, name
        );
        let now = Utc::now();
        let ps = PartnerEventSource {
            name: name.clone(),
            arn: arn.clone(),
            account,
            creation_time: now,
            expiration_time: None,
            state: "ACTIVE".to_string(),
        };
        state.partner_event_sources.insert(name.clone(), ps);

        Ok(AwsResponse::ok_json(json!({ "EventSourceArn": arn })))
    }

    pub(super) fn delete_partner_event_source(
        &self,
        req: &AwsRequest,
    ) -> Result<AwsResponse, AwsServiceError> {
        let body = req.json_body();
        // Smithy declares Name (length 1..=256) and Account (length 12..=12)
        // as @required. Constraint violations surface as ValidationException
        // — the same wire shape AWS uses for malformed input even on
        // operations whose `errors:` set doesn't list it explicitly.
        validate_required("Name", &body["Name"])?;
        validate_required("Account", &body["Account"])?;
        let name = body["Name"]
            .as_str()
            .ok_or_else(|| missing("Name"))?
            .to_string();
        validate_string_length("Name", &name, 1, 256)?;
        let account = body["Account"]
            .as_str()
            .ok_or_else(|| missing("Account"))?
            .to_string();
        validate_string_length("Account", &account, 12, 12)?;

        let mut accounts = self.state.write();
        let state = accounts.get_or_create(&req.account_id);
        if let Some(ps) = state.partner_event_sources.get(&name) {
            if ps.account == account {
                state.partner_event_sources.remove(&name);
            }
        }

        Ok(AwsResponse::ok_json(json!({})))
    }

    pub(super) fn describe_partner_event_source(
        &self,
        req: &AwsRequest,
    ) -> Result<AwsResponse, AwsServiceError> {
        let body = req.json_body();
        validate_required("Name", &body["Name"])?;
        let name = body["Name"]
            .as_str()
            .ok_or_else(|| missing("Name"))?
            .to_string();
        validate_string_length("name", &name, 1, 256)?;

        let accounts = self.state.read();
        let empty = EventBridgeState::new(&req.account_id, &req.region);
        let state = accounts.get(&req.account_id).unwrap_or(&empty);
        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
            AwsServiceError::aws_error(
                StatusCode::NOT_FOUND,
                "ResourceNotFoundException",
                format!("Partner event source {name} does not exist."),
            )
        })?;

        Ok(AwsResponse::ok_json(json!({
            "Arn": ps.arn,
            "Name": ps.name,
        })))
    }

    pub(super) fn list_partner_event_sources(
        &self,
        req: &AwsRequest,
    ) -> Result<AwsResponse, AwsServiceError> {
        let body = req.json_body();
        validate_required("namePrefix", &body["NamePrefix"])?;
        let name_prefix = body["NamePrefix"]
            .as_str()
            .ok_or_else(|| missing("NamePrefix"))?;
        validate_string_length("namePrefix", name_prefix, 1, 256)?;
        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;

        let accounts = self.state.read();
        let empty = EventBridgeState::new(&req.account_id, &req.region);
        let state = accounts.get(&req.account_id).unwrap_or(&empty);
        let all: Vec<Value> = state
            .partner_event_sources
            .values()
            .filter(|ps| ps.name.starts_with(name_prefix))
            .map(|ps| {
                json!({
                    "Arn": ps.arn,
                    "Name": ps.name,
                })
            })
            .collect();

        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
        let mut resp = json!({ "PartnerEventSources": sources });
        if let Some(token) = next_token {
            resp["NextToken"] = json!(token);
        }

        Ok(AwsResponse::ok_json(resp))
    }

    pub(super) fn list_partner_event_source_accounts(
        &self,
        req: &AwsRequest,
    ) -> Result<AwsResponse, AwsServiceError> {
        let body = req.json_body();
        validate_required("EventSourceName", &body["EventSourceName"])?;
        let event_source_name = body["EventSourceName"]
            .as_str()
            .ok_or_else(|| missing("EventSourceName"))?;
        validate_string_length("eventSourceName", event_source_name, 1, 256)?;
        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;

        let accounts = self.state.read();
        let empty = EventBridgeState::new(&req.account_id, &req.region);
        let state = accounts.get(&req.account_id).unwrap_or(&empty);
        let accounts: Vec<Value> = state
            .partner_event_sources
            .values()
            .filter(|ps| ps.name == event_source_name)
            .map(|ps| json!({ "Account": ps.account }))
            .collect();

        Ok(AwsResponse::ok_json(json!({
            "PartnerEventSourceAccounts": accounts
        })))
    }

    pub(super) fn activate_event_source(
        &self,
        req: &AwsRequest,
    ) -> Result<AwsResponse, AwsServiceError> {
        let body = req.json_body();
        validate_required("Name", &body["Name"])?;
        let name = body["Name"]
            .as_str()
            .ok_or_else(|| missing("Name"))?
            .to_string();

        let mut accounts = self.state.write();
        let state = accounts.get_or_create(&req.account_id);
        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
            AwsServiceError::aws_error(
                StatusCode::NOT_FOUND,
                "ResourceNotFoundException",
                format!("Event source {name} does not exist."),
            )
        })?;
        ps.state = "ACTIVE".to_string();

        Ok(AwsResponse::ok_json(json!({})))
    }

    pub(super) fn deactivate_event_source(
        &self,
        req: &AwsRequest,
    ) -> Result<AwsResponse, AwsServiceError> {
        let body = req.json_body();
        validate_required("Name", &body["Name"])?;
        let name = body["Name"]
            .as_str()
            .ok_or_else(|| missing("Name"))?
            .to_string();

        let mut accounts = self.state.write();
        let state = accounts.get_or_create(&req.account_id);
        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
            AwsServiceError::aws_error(
                StatusCode::NOT_FOUND,
                "ResourceNotFoundException",
                format!("Event source {name} does not exist."),
            )
        })?;
        ps.state = "INACTIVE".to_string();

        Ok(AwsResponse::ok_json(json!({})))
    }

    pub(super) fn describe_event_source(
        &self,
        req: &AwsRequest,
    ) -> Result<AwsResponse, AwsServiceError> {
        let body = req.json_body();
        validate_required("Name", &body["Name"])?;
        let name = body["Name"]
            .as_str()
            .ok_or_else(|| missing("Name"))?
            .to_string();

        let accounts = self.state.read();
        let empty = EventBridgeState::new(&req.account_id, &req.region);
        let state = accounts.get(&req.account_id).unwrap_or(&empty);
        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
            AwsServiceError::aws_error(
                StatusCode::NOT_FOUND,
                "ResourceNotFoundException",
                format!("Event source {name} does not exist."),
            )
        })?;

        Ok(AwsResponse::ok_json(json!({
            "Arn": ps.arn,
            "Name": ps.name,
            "CreatedBy": ps.account,
            "CreationTime": ps.creation_time.timestamp() as f64,
            "State": ps.state,
        })))
    }

    pub(super) fn list_event_sources(
        &self,
        req: &AwsRequest,
    ) -> Result<AwsResponse, AwsServiceError> {
        let body = req.json_body();
        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
        let name_prefix = body["NamePrefix"].as_str();
        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;

        let accounts = self.state.read();
        let empty = EventBridgeState::new(&req.account_id, &req.region);
        let state = accounts.get(&req.account_id).unwrap_or(&empty);
        let all: Vec<Value> = state
            .partner_event_sources
            .values()
            .filter(|ps| match name_prefix {
                Some(prefix) => ps.name.starts_with(prefix),
                None => true,
            })
            .map(|ps| {
                json!({
                    "Arn": ps.arn,
                    "Name": ps.name,
                    "CreatedBy": ps.account,
                    "CreationTime": ps.creation_time.timestamp() as f64,
                    "State": ps.state,
                })
            })
            .collect();

        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
        let mut resp = json!({ "EventSources": sources });
        if let Some(token) = next_token {
            resp["NextToken"] = json!(token);
        }

        Ok(AwsResponse::ok_json(resp))
    }

    pub(super) fn put_partner_events(
        &self,
        req: &AwsRequest,
    ) -> Result<AwsResponse, AwsServiceError> {
        let body = req.json_body();
        validate_required("Entries", &body["Entries"])?;
        let entries = body["Entries"]
            .as_array()
            .ok_or_else(|| missing("Entries"))?;

        let mut result_entries = Vec::new();
        for _entry in entries {
            let event_id = uuid::Uuid::new_v4().to_string();
            result_entries.push(json!({ "EventId": event_id }));
        }

        Ok(AwsResponse::ok_json(json!({
            "FailedEntryCount": 0,
            "Entries": result_entries,
        })))
    }

    // ─── TestEventPattern ────────────────────────────────────────────────
}