use async_trait::async_trait;
use chrono::{DateTime, Utc};
use cloudevents::{AttributesReader, Data, Event, EventBuilder, EventBuilderV10};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tracing::*;
use crate::datetime_serde::default_dt_formatter;
pub const READ_INPUT_FROM_URL_HEADER: &str = "ce-inputurl";
enum ProofStatus {
Success = 2,
Failure = 3,
}
#[derive(Serialize, Deserialize)]
struct ProofResponse {
task_id: String,
result: String,
status: u8,
#[serde(with = "default_dt_formatter")]
started: DateTime<Utc>,
#[serde(with = "default_dt_formatter")]
finished: DateTime<Utc>,
}
#[async_trait]
pub trait ProofHandler {
type Input: DeserializeOwned + Send + 'static;
type Output: Serialize + Send + 'static;
type Error: Serialize + Send + 'static;
async fn prove(data: Self::Input) -> Result<Self::Output, Self::Error>;
#[instrument(skip(event), fields(task_id = event.id()))] async fn handle(event: Event) -> Result<Event, actix_web::Error> {
let started = Utc::now();
trace!("Start time {started}");
let input_url = event.extension(READ_INPUT_FROM_URL_HEADER);
let result_with_input = match input_url {
Some(url) => {
let url_str = url.to_string();
match reqwest::get(&url_str).await {
Ok(response) => match response.text().await {
Ok(text) => serde_json::from_str::<Self::Input>(&text).map_err(|err| {
format!("Failed to parse JSON from prover input URL: {url_str}; err: {err:?}")
}),
Err(err) => Err(format!(
"Failed to read HTTP response body from prover input URL: {url_str}. Error: {err:?}",
)),
},
Err(err) => Err(format!(
"Failed to fetch prover input from URL: {url_str}. Network error: {err:?}",
)),
}
}
None => {
event
.data()
.ok_or("Event payload is missing".to_string())
.and_then(|data| {
match data {
Data::Binary(v) => serde_json::from_slice(v),
Data::String(v) => serde_json::from_str(v),
Data::Json(v) => serde_json::from_value(v.clone()),
}
.map_err(|err| format!("Failed to parse Json from event payload. Error: {err:?}"))
})
}
};
let (result, status) = match result_with_input {
Ok(input) => {
let info_span = info_span!("prove");
let result = tokio::spawn(async move { Self::prove(input).instrument(info_span).await }).await;
match result {
Ok(prove_result) => match prove_result {
Ok(proof) => (
serde_json::to_string(&proof).map_err(|err| {
error!("Error while serializing success output: {err:?}");
err
})?,
ProofStatus::Success,
),
Err(error) => (
serde_json::to_string(&error).map_err(|err| {
error!("Error while serializing prove error: {err:?}");
err
})?,
ProofStatus::Failure,
),
},
Err(err) => match err.try_into_panic() {
Ok(panic) => {
let panic_message = panic
.downcast_ref::<String>()
.map(|s| s.as_str())
.or_else(|| panic.downcast_ref::<&str>().copied())
.unwrap_or("A panic occurred during proof.");
(panic_message.to_string(), ProofStatus::Failure)
}
Err(join_err) => (
format!("Unexpected error while waiting for proof: {join_err:?}"),
ProofStatus::Failure,
),
},
}
}
Err(handle_request_err) => {
error!(handle_request_err);
(handle_request_err.to_string(), ProofStatus::Failure)
}
};
let response = serde_json::to_value(ProofResponse {
task_id: event.id().to_string(),
result,
status: status as u8,
started,
finished: Utc::now(),
})?;
let source = format!("prover-{}", event.ty());
EventBuilderV10::new()
.id(event.id())
.source(source)
.ty("tenant-service-result") .data("application/json", response)
.build()
.map_err(actix_web::error::ErrorInternalServerError)
}
}
#[cfg(test)]
mod tests {
use super::*;
use mockito::Server;
use serde_json::json;
struct MyHandler {}
struct MyHandlerWithPanic {}
#[derive(Deserialize)]
struct MyInput {
name: String,
}
#[derive(Serialize)]
struct MyOutput {
result: String,
}
#[async_trait]
impl ProofHandler for MyHandler {
type Input = MyInput;
type Output = MyOutput;
type Error = String;
async fn prove(data: MyInput) -> Result<MyOutput, String> {
Ok(MyOutput { result: data.name })
}
}
#[async_trait]
impl ProofHandler for MyHandlerWithPanic {
type Input = MyInput;
type Output = MyOutput;
type Error = ();
async fn prove(_data: MyInput) -> Result<MyOutput, ()> {
panic!("Houston, we have a problem")
}
}
fn get_payload(event: Event) -> ProofResponse {
if let Some(Data::Json(v)) = event.data() {
serde_json::from_value::<ProofResponse>(v.clone()).unwrap()
} else {
panic!("Expected JSON data in the event.");
}
}
#[actix_rt::test]
async fn test_prover_handles_valid_event() {
let mock_event = EventBuilderV10::new()
.id("test_id")
.source("test://source")
.ty("12345678")
.data("application/json", json!({"name": "aloha"}))
.build()
.unwrap();
let result = MyHandler::handle(mock_event).await;
assert!(result.is_ok());
let event = result.unwrap();
assert_eq!(event.source().to_string(), "prover-12345678");
assert_eq!(event.ty().to_string(), "tenant-service-result");
let response = get_payload(event);
assert_eq!(response.status, ProofStatus::Success as u8);
assert_eq!(response.result, "{\"result\":\"aloha\"}");
}
#[actix_rt::test]
async fn test_prover_handles_event_wrong_input() {
let mock_event = EventBuilderV10::new()
.id("test_id")
.source("test://source")
.data("application/json", json!({"wrong_key": "aloha"}))
.ty("test.type")
.build()
.unwrap();
let result = MyHandler::handle(mock_event).await;
assert!(result.is_ok());
let response = get_payload(result.unwrap());
assert_eq!(response.status, ProofStatus::Failure as u8);
assert!(response.result.contains("missing field `name`"));
}
#[actix_rt::test]
async fn test_prover_handles_event_missing_input() {
let mock_event = EventBuilderV10::new()
.id("test_id")
.source("test://source")
.ty("test.type")
.build()
.unwrap();
let result = MyHandler::handle(mock_event).await;
assert!(result.is_ok());
let response = get_payload(result.unwrap());
assert_eq!(response.status, ProofStatus::Failure as u8);
assert!(response.result.contains("Event payload is missing"));
}
#[actix_rt::test]
async fn test_prover_handles_invalid_json_input() {
let mock_event = EventBuilderV10::new()
.id("test_id")
.source("test://source")
.data("application/json", "invalid json {") .ty("test.type")
.build()
.unwrap();
let result = MyHandler::handle(mock_event).await;
assert!(result.is_ok());
let response = get_payload(result.unwrap());
assert_eq!(response.status, ProofStatus::Failure as u8);
assert!(
response.result.contains("Failed to parse Json from event payload"),
"Expected JSON parsing error message, got: {}",
response.result
);
}
#[actix_rt::test]
async fn test_prover_handles_panic() {
let mock_event = EventBuilderV10::new()
.id("test_id")
.source("test://source")
.ty("12345678")
.data("application/json", json!({"name": "aloha"}))
.build()
.unwrap();
let result = MyHandlerWithPanic::handle(mock_event).await;
assert!(result.is_ok());
let response = get_payload(result.unwrap());
assert_eq!(response.status, ProofStatus::Failure as u8);
assert_eq!(response.result, "Houston, we have a problem");
}
#[actix_rt::test]
async fn test_prover_handles_url_input() {
let server = std::thread::spawn(|| {
let mut server = Server::new();
let mock = server
.mock("GET", "/input")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"name": "from_url"}"#)
.expect(1)
.create();
(server, mock)
})
.join()
.unwrap();
let test_url = format!("{}/input", server.0.url());
let mock_event = EventBuilderV10::new()
.id("test_id")
.source("test://source")
.ty("12345678")
.extension(READ_INPUT_FROM_URL_HEADER, test_url)
.build()
.unwrap();
let result = MyHandler::handle(mock_event).await;
assert!(result.is_ok());
let event = result.unwrap();
assert_eq!(event.source().to_string(), "prover-12345678");
assert_eq!(event.ty().to_string(), "tenant-service-result");
let response = get_payload(event);
assert_eq!(response.status, ProofStatus::Success as u8);
assert_eq!(response.result, "{\"result\":\"from_url\"}");
server.1.assert();
}
#[actix_rt::test]
async fn test_prover_handles_url_fetch_failure() {
let test_url = "http://localhost:12345/nonexistent";
let mock_event = EventBuilderV10::new()
.id("test_id")
.source("test://source")
.ty("12345678")
.extension(READ_INPUT_FROM_URL_HEADER, test_url)
.build()
.unwrap();
let result = MyHandler::handle(mock_event).await;
assert!(result.is_ok(), "Handler should return Ok even for URL fetch failures");
let event = result.unwrap();
let response = get_payload(event);
assert_eq!(response.status, ProofStatus::Failure as u8);
assert!(
response.result.contains("Failed to fetch prover input from URL")
&& response.result.contains("Network error")
&& response.result.contains(test_url),
"Expected detailed error message about URL fetch failure, got: {}",
response.result
);
}
#[actix_rt::test]
async fn test_prover_handles_url_invalid_json() {
let server = std::thread::spawn(|| {
let mut server = Server::new();
let mock = server
.mock("GET", "/invalid-json")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"invalid json format"#)
.expect(1)
.create();
(server, mock)
})
.join()
.unwrap();
let test_url = format!("{}/invalid-json", server.0.url());
let mock_event = EventBuilderV10::new()
.id("test_id")
.source("test://source")
.ty("12345678")
.extension(READ_INPUT_FROM_URL_HEADER, test_url.clone())
.build()
.unwrap();
let result = MyHandler::handle(mock_event).await;
assert!(result.is_ok());
let response = get_payload(result.unwrap());
assert_eq!(response.status, ProofStatus::Failure as u8);
assert!(
response.result.contains("Failed to parse JSON from prover input URL")
&& response.result.contains(&test_url),
"Expected detailed error about JSON parsing failure, got: {}",
response.result
);
server.1.assert();
}
#[actix_rt::test]
async fn test_prover_handles_url_wrong_json_schema() {
let server = std::thread::spawn(|| {
let mut server = Server::new();
let mock = server
.mock("GET", "/wrong-schema")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"wrong_field": "value"}"#) .expect(1)
.create();
(server, mock)
})
.join()
.unwrap();
let test_url = format!("{}/wrong-schema", server.0.url());
let mock_event = EventBuilderV10::new()
.id("test_id")
.source("test://source")
.ty("12345678")
.extension(READ_INPUT_FROM_URL_HEADER, test_url)
.build()
.unwrap();
let result = MyHandler::handle(mock_event).await;
assert!(result.is_ok());
let response = get_payload(result.unwrap());
assert_eq!(response.status, ProofStatus::Failure as u8);
assert!(response.result.contains("missing field `name`"));
server.1.assert();
}
}