run_message_handler

Function run_message_handler 

Source
pub async fn run_message_handler<EventType, EventOutput, F, Fut, R, Msg, Context, Env>(
    message_handler: F,
) -> Result<(), Error>
where EventType: for<'de> Deserialize<'de> + RunnableEventType<Msg, R, EventOutput>, F: Fn(Msg, Arc<Context>) -> Fut, Fut: Future<Output = Result<R>>, Msg: DeserializeOwned, Context: LambdaContext<Env, EventType> + Debug, EventOutput: Serialize, Env: Parser + Debug,
Expand description

Executes a message handler against all the messages received in a batch from an SQS event source mapping.

The run_message_handler function takes care of the following tasks:

  • Executes the Lambda runtime, using lambda_runtime.
  • Sets up tracing to ensure all tracing::<...>!() calls are JSON formatted for consumption by CloudWatch.
  • Processes environment variables and makes them available to your handler
  • Initialises a shared context object, which is passed to your handler.
  • Deserialises a batch of messages and passes each one to your handler.
  • Processes messages concurrently, based on the env var RECORD_CONCURRENCY (default: 1)

§Writing a message handler

To write a message handler, you need to define four elements:

  • The Message structure, which defines the structure of the messages which will be sent to the SQS queue, and then forwarded on to your Lambda.
  • The Env structure, which defines the expected environment variables your Lambda will receive.
  • The Context structure, which is provided the Env structure, and represents the shared state that will be passed into your message handler. This structure needs to implement the LambdaContext trait.
  • The message_handler function, which accepts a Message and a Context, and performs the desired actions.

§Example

use anyhow::Result;
use async_trait::async_trait;
use clap::Parser;
use serde::Deserialize;
use std::fmt::Debug;
use std::sync::Arc;

use cobalt_aws::lambda::{run_message_handler, Error, LambdaContext, SqsEvent};

#[tokio::main]
async fn main() -> Result<(), Error> {
    run_message_handler(message_handler).await
}

/// The structure of the messages we expect to see on the queue.
#[derive(Debug, Deserialize)]
pub struct Message {
    pub target: String,
}

/// Configuration we receive from environment variables.
///
/// Note: all fields should be tagged with the `#[arg(env)]` attribute.
#[derive(Debug, Parser)]
pub struct Env {
    #[arg(env)]
    pub greeting: String,
}

/// Shared context we build up before invoking the lambda runner.
#[derive(Debug)]
pub struct Context {
    pub greeting: String,
}

#[async_trait]
impl LambdaContext<Env, SqsEvent> for Context {
    /// Initialise a shared context object from which will be
    /// passed to all instances of the message handler.
    async fn from_env(env: &Env) -> Result<Context> {
        Ok(Context {
            greeting: env.greeting.clone(),
        })
    }
}

/// Process a single message from the SQS queue, within the given context.
async fn message_handler(message: Message, context: Arc<Context>) -> Result<()> {
    tracing::debug!("Message: {:?}", message);
    tracing::debug!("Context: {:?}", context);

    // Log a greeting to the target
    tracing::info!("{}, {}!", context.greeting, message.target);

    Ok(())
}

§Concurrent processing

By default run_message_handler will process the messages in an event batch sequentially. You can configure run_message_handler to process messages concurrently by setting the RECORD_CONCURRENCY env var (default: 1). This value should not exceed the value of BatchSize configured for the event source mapping (default: 10).

§Error handling

If any errors are raised during init, or from the message_handler function, then the entire message batch will be considered to have failed. Error messages will be logged to stdout in a format compatible with CloudWatch, and the message batch being processed will be returned to the original queue.