athena_udf/
lib.rs

1pub mod arrow_conversions;
2pub mod process_macro;
3pub mod register_macro;
4pub mod request;
5pub mod response;
6pub mod serde_base64;
7pub mod serialization;
8
9use arrow::record_batch::RecordBatch;
10pub use arrow_conversions::{FromArrow, ToArrow};
11pub use process_macro::UDFProcessor;
12pub use request::{AthenaUDFRequest, Identity, InputRecords, OutputSchemaWrapper, PingRequest};
13pub use response::{AthenaResponse, AthenaUDFResponse, OutputRecords, PingResponse};
14
15pub use lambda_runtime::{run, service_fn, LambdaEvent};
16pub use serde::{Deserialize, Serialize};
17pub use serde_json::Value;
18
19// Re-export for backwards compatibility
20use lambda_runtime::Error;
21
22pub fn wrap_response(response: AthenaResponse, is_http: bool) -> Result<Value, Error> {
23    response.wrap_response(is_http)
24}
25
26/// A handler function type for processing UDF requests.
27///
28/// Takes the input batch, method name, and output column name,
29/// and returns a processed RecordBatch.
30pub type UDFHandler = fn(&RecordBatch, &str, &str) -> Result<RecordBatch, Error>;
31
32/// Main entry point for Athena UDF Lambda handlers.
33///
34/// Automatically handles both PingRequest and UserDefinedFunctionRequest,
35/// routing UDF calls to the provided handler function.
36///
37/// # Examples
38///
39/// ```no_run
40/// use athena_udf::*;
41/// use lambda_runtime::{service_fn, run, Error};
42///
43/// fn string_reverse(s: String) -> String {
44///     s.chars().rev().collect()
45/// }
46///
47/// async fn function_handler(event: LambdaEvent<Value>) -> Result<Value, Error> {
48///     handle_athena_request(event, |input_batch, method_name, output_col_name| {
49///         match method_name {
50///             "string_reverse" => UDFProcessor::new(input_batch)
51///                 .process_unary::<String, String, _>(output_col_name, string_reverse),
52///             _ => Err(format!("Unknown function: {}", method_name).into()),
53///         }
54///     }).await
55/// }
56/// ```
57pub async fn handle_athena_request<F>(
58    event: LambdaEvent<Value>,
59    udf_handler: F,
60) -> Result<Value, Error>
61where
62    F: Fn(&RecordBatch, &str, &str) -> Result<RecordBatch, Error>,
63{
64    let (actual_payload, is_http) = AthenaResponse::parse_request(event.payload)?;
65
66    let request_type = actual_payload
67        .get("@type")
68        .and_then(|v| v.as_str())
69        .ok_or("Missing @type field")?;
70
71    let response = match request_type {
72        "PingRequest" => {
73            let ping_req: PingRequest = serde_json::from_value(actual_payload)?;
74            ping_req.handle()
75        }
76        "UserDefinedFunctionRequest" => {
77            let udf_req: AthenaUDFRequest = serde_json::from_value(actual_payload)?;
78            udf_req.process_with(&udf_handler)?
79        }
80        _ => return Err(format!("Unknown request type: {}", request_type).into()),
81    };
82
83    response.wrap_response(is_http)
84}