kinetics 0.15.1

Kinetics is a hosting platform for Rust applications that allows you to deploy all types of workloads by writing **only Rust code**.
Documentation
use crate::tools::{config::Config as KineticsConfig, resource_name};
use aws_lambda_events::sqs::{BatchItemFailure, SqsBatchResponse, SqsEvent};
use aws_sdk_sqs::operation::send_message::builders::SendMessageFluentBuilder;
use eyre::OptionExt;
use kinetics_parser::ParsedFunction;
use lambda_runtime::LambdaEvent;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{OnceCell, RwLock};

#[derive(Clone)]
pub struct Client {
    queue: SendMessageFluentBuilder,
}

static SQS_CLIENT_CACHE: OnceCell<Arc<RwLock<HashMap<String, Client>>>> = OnceCell::const_new();

/// A queue client
///
/// Used to send items to the worker queue.
impl Client {
    pub fn new(queue: SendMessageFluentBuilder) -> Self {
        Client { queue }
    }

    /// Send a message to the queue
    ///
    /// Return Ok(()) if operation succeeds
    pub async fn send(
        &self,
        message: impl ::std::convert::Into<::std::string::String>,
    ) -> eyre::Result<()> {
        self.queue.clone().message_body(message).send().await?;
        Ok(())
    }

    /// Init the client from the reference to worker function
    ///
    /// The client is initialised just once and than reused.
    pub async fn from_worker<'a, Fut>(
        worker: impl Fn(Vec<Record>, &'a HashMap<String, String>, &'a KineticsConfig) -> Fut,
    ) -> eyre::Result<Self>
    where
        Fut:
            std::future::Future<Output = Result<Retries, Box<dyn std::error::Error + Send + Sync>>>,
    {
        let cache_key = std::any::type_name_of_val(&worker);

        let cache = SQS_CLIENT_CACHE
            .get_or_init(|| async { Arc::new(RwLock::new(HashMap::new())) })
            .await;

        // Check if the client is already initialized
        let mut write_guard = cache.write().await;

        if let Some(client) = write_guard.get(cache_key) {
            return Ok(client.clone());
        }

        let client = Client {
            queue: {
                let (project_name, function_path) = cache_key
                    .split_once("::")
                    .ok_or_eyre("Failed to get the project name from a worker")?;

                let region = std::env::var("AWS_REGION").unwrap_or("us-east-1".to_string());

                let queue_endpoint_url = std::env::var("KINETICS_QUEUE_ENDPOINT_URL")
                    .unwrap_or(format!("https://sqs.{region}.amazonaws.com"));

                let config = if std::env::var("KINETICS_IS_LOCAL").is_ok() {
                    // Redefine endpoint in local mode
                    aws_config::defaults(aws_config::BehaviorVersion::latest())
                        .endpoint_url(&queue_endpoint_url)
                        .load()
                        .await
                } else {
                    aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await
                };

                // Use a hardcoded queue name for local invocations, otherwise generate the name
                // out of user and project names
                let queue_name = std::env::var("KINETICS_QUEUE_NAME")
                    .or_else(|_| {
                        Ok::<String, std::env::VarError>(resource_name(
                            &std::env::var("KINETICS_USERNAME")
                                .expect("KINETICS_USERNAME is not set"),
                            project_name,
                            &ParsedFunction::path_to_name(&function_path.replace("::", "/")),
                        ))
                    })
                    .expect("Queue name is not set");

                let account_id = std::env::var("KINETICS_CLOUD_ACCOUNT_ID")
                    .expect("KINETICS_CLOUD_ACCOUNT_ID is not set");

                aws_sdk_sqs::Client::new(&config)
                    .send_message()
                    // Create a full queue URL in a known format:
                    // https://sqs.us-east1.amazonaws.com/000000000000/kinetics-queue-name
                    .queue_url(format!("{queue_endpoint_url}/{account_id}/{queue_name}"))
            },
        };

        write_guard.insert(cache_key.to_string(), client.clone());
        Ok(client)
    }
}

/// Items to be retried by worker queue
///
/// Worker function must return a Retries struct with ids of items that need
/// to be retried.
#[derive(Default)]
pub struct Retries {
    ids: Vec<String>,
}

impl Retries {
    pub fn new() -> Self {
        Default::default()
    }

    pub fn add(&mut self, item: &str) {
        self.ids.push(item.to_string());
    }

    /// Serialize to the format which can be understood by queue API
    pub fn collect(&self) -> SqsBatchResponse {
        let mut sqs_batch_response = SqsBatchResponse::default();

        for id in self.ids.iter() {
            // Construct through Default, because BatchItemFailure is non_exhaustive
            let mut item = BatchItemFailure::default();
            item.item_identifier = id.to_owned();
            sqs_batch_response.batch_item_failures.push(item);
        }

        sqs_batch_response
    }
}

/// A record received from a queue
#[derive(Deserialize, Serialize, Debug)]
pub struct Record {
    #[serde(default)]
    pub message_id: Option<String>,

    #[serde(default)]
    pub body: Option<String>,
}

impl Record {
    pub fn from_sqsevent(event: LambdaEvent<SqsEvent>) -> eyre::Result<Vec<Record>> {
        Ok(event
            .payload
            .records
            .iter()
            .map(|r| Record {
                message_id: r.message_id.clone(),
                body: r.body.clone(),
            })
            .collect())
    }
}