pub async fn run_message_handler<EventType, EventOutput, F, Fut, R, Msg, Context, Env>(
message_handler: F,
) -> Result<(), Error>where
EventType: for<'de> Deserialize<'de> + RunnableEventType<Msg, R, EventOutput>,
F: Fn(Msg, Arc<Context>) -> Fut,
Fut: Future<Output = Result<R>>,
Msg: DeserializeOwned,
Context: LambdaContext<Env, EventType> + Debug,
EventOutput: Serialize,
Env: Parser + Debug,Expand description
Executes a message handler against all the messages received in a batch from an SQS event source mapping.
The run_message_handler function takes care of the following tasks:
- Executes the Lambda runtime, using lambda_runtime.
- Sets up tracing to ensure all
tracing::<...>!()calls are JSON formatted for consumption by CloudWatch. - Processes environment variables and makes them available to your handler
- Initialises a shared context object, which is passed to your handler.
- Deserialises a batch of messages and passes each one to your handler.
- Processes messages concurrently, based on the env var
RECORD_CONCURRENCY(default: 1)
§Writing a message handler
To write a message handler, you need to define four elements:
- The
Messagestructure, which defines the structure of the messages which will be sent to the SQS queue, and then forwarded on to your Lambda. - The
Envstructure, which defines the expected environment variables your Lambda will receive. - The
Contextstructure, which is provided theEnvstructure, and represents the shared state that will be passed into your message handler. This structure needs to implement the LambdaContext trait. - The
message_handlerfunction, which accepts aMessageand aContext, and performs the desired actions.
§Example
use anyhow::Result;
use async_trait::async_trait;
use clap::Parser;
use serde::Deserialize;
use std::fmt::Debug;
use std::sync::Arc;
use cobalt_aws::lambda::{run_message_handler, Error, LambdaContext, SqsEvent};
#[tokio::main]
async fn main() -> Result<(), Error> {
run_message_handler(message_handler).await
}
/// The structure of the messages we expect to see on the queue.
#[derive(Debug, Deserialize)]
pub struct Message {
pub target: String,
}
/// Configuration we receive from environment variables.
///
/// Note: all fields should be tagged with the `#[arg(env)]` attribute.
#[derive(Debug, Parser)]
pub struct Env {
#[arg(env)]
pub greeting: String,
}
/// Shared context we build up before invoking the lambda runner.
#[derive(Debug)]
pub struct Context {
pub greeting: String,
}
#[async_trait]
impl LambdaContext<Env, SqsEvent> for Context {
/// Initialise a shared context object from which will be
/// passed to all instances of the message handler.
async fn from_env(env: &Env) -> Result<Context> {
Ok(Context {
greeting: env.greeting.clone(),
})
}
}
/// Process a single message from the SQS queue, within the given context.
async fn message_handler(message: Message, context: Arc<Context>) -> Result<()> {
tracing::debug!("Message: {:?}", message);
tracing::debug!("Context: {:?}", context);
// Log a greeting to the target
tracing::info!("{}, {}!", context.greeting, message.target);
Ok(())
}§Concurrent processing
By default run_message_handler will process the messages in an event batch sequentially.
You can configure run_message_handler to process messages concurrently by setting
the RECORD_CONCURRENCY env var (default: 1). This value should not exceed the value of BatchSize
configured for the event source mapping
(default: 10).
§Error handling
If any errors are raised during init, or from the message_handler function, then the entire message
batch will be considered to have failed. Error messages will be logged to stdout in a format compatible
with CloudWatch, and the message batch being processed will be returned to the original queue.