1pub mod arrow_conversions;
2pub mod process_macro;
3pub mod register_macro;
4pub mod request;
5pub mod response;
6pub mod serde_base64;
7pub mod serialization;
8
9use arrow::record_batch::RecordBatch;
10pub use arrow_conversions::{FromArrow, ToArrow};
11pub use process_macro::UDFProcessor;
12pub use request::{AthenaUDFRequest, Identity, InputRecords, OutputSchemaWrapper, PingRequest};
13pub use response::{AthenaResponse, AthenaUDFResponse, OutputRecords, PingResponse};
14
15pub use lambda_runtime::{run, service_fn, LambdaEvent};
16pub use serde::{Deserialize, Serialize};
17pub use serde_json::Value;
18
19use lambda_runtime::Error;
21
22pub fn wrap_response(response: AthenaResponse, is_http: bool) -> Result<Value, Error> {
23 response.wrap_response(is_http)
24}
25
26pub type UDFHandler = fn(&RecordBatch, &str, &str) -> Result<RecordBatch, Error>;
31
32pub async fn handle_athena_request<F>(
58 event: LambdaEvent<Value>,
59 udf_handler: F,
60) -> Result<Value, Error>
61where
62 F: Fn(&RecordBatch, &str, &str) -> Result<RecordBatch, Error>,
63{
64 let (actual_payload, is_http) = AthenaResponse::parse_request(event.payload)?;
65
66 let request_type = actual_payload
67 .get("@type")
68 .and_then(|v| v.as_str())
69 .ok_or("Missing @type field")?;
70
71 let response = match request_type {
72 "PingRequest" => {
73 let ping_req: PingRequest = serde_json::from_value(actual_payload)?;
74 ping_req.handle()
75 }
76 "UserDefinedFunctionRequest" => {
77 let udf_req: AthenaUDFRequest = serde_json::from_value(actual_payload)?;
78 udf_req.process_with(&udf_handler)?
79 }
80 _ => return Err(format!("Unknown request type: {}", request_type).into()),
81 };
82
83 response.wrap_response(is_http)
84}