use crate::extract;
use alloy_primitives::Bytes;
use blueprint_client_tangle::TangleClient;
use blueprint_core::JobResult;
use blueprint_core::error::BoxError;
use blueprint_std::boxed::Box;
use blueprint_std::collections::VecDeque;
use blueprint_std::format;
use blueprint_std::string::String;
use blueprint_std::sync::{Arc, Mutex};
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_util::Sink;
#[derive(Debug, thiserror::Error)]
pub enum ConsumerError {
#[error("Client error: {0}")]
Client(String),
#[error("Missing metadata: {0}")]
MissingMetadata(&'static str),
#[error("Invalid metadata: {0}")]
InvalidMetadata(&'static str),
#[error("Transaction error: {0}")]
Transaction(String),
}
struct DerivedJobResult {
service_id: u64,
call_id: u64,
output: Bytes,
}
enum State {
WaitingForResult,
ProcessingSubmission(
Pin<Box<dyn core::future::Future<Output = Result<(), ConsumerError>> + Send>>,
),
}
impl State {
fn is_waiting(&self) -> bool {
matches!(self, State::WaitingForResult)
}
}
pub struct TangleConsumer {
client: Arc<TangleClient>,
buffer: Mutex<VecDeque<DerivedJobResult>>,
state: Mutex<State>,
}
impl TangleConsumer {
pub fn new(client: TangleClient) -> Self {
Self {
client: Arc::new(client),
buffer: Mutex::new(VecDeque::new()),
state: Mutex::new(State::WaitingForResult),
}
}
#[must_use]
pub fn client(&self) -> &TangleClient {
&self.client
}
}
impl Sink<JobResult> for TangleConsumer {
type Error = BoxError;
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: JobResult) -> Result<(), Self::Error> {
let JobResult::Ok { head, body } = &item else {
blueprint_core::trace!(target: "tangle-consumer", "Discarding job result with error");
return Ok(());
};
let (Some(call_id_raw), Some(service_id_raw)) = (
head.metadata.get(extract::CallId::METADATA_KEY),
head.metadata.get(extract::ServiceId::METADATA_KEY),
) else {
blueprint_core::trace!(target: "tangle-consumer", "Discarding job result with missing metadata");
return Ok(());
};
blueprint_core::debug!(target: "tangle-consumer", result = ?item, "Received job result, handling...");
let call_id: u64 = call_id_raw
.try_into()
.map_err(|_| ConsumerError::InvalidMetadata("call_id"))?;
let service_id: u64 = service_id_raw
.try_into()
.map_err(|_| ConsumerError::InvalidMetadata("service_id"))?;
self.get_mut()
.buffer
.lock()
.unwrap()
.push_back(DerivedJobResult {
service_id,
call_id,
output: Bytes::copy_from_slice(body),
});
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let consumer = self.get_mut();
let mut state = consumer.state.lock().unwrap();
{
let buffer = consumer.buffer.lock().unwrap();
if buffer.is_empty() && state.is_waiting() {
return Poll::Ready(Ok(()));
}
}
loop {
match &mut *state {
State::WaitingForResult => {
let result = {
let mut buffer = consumer.buffer.lock().unwrap();
buffer.pop_front()
};
let Some(DerivedJobResult {
service_id,
call_id,
output,
}) = result
else {
return Poll::Ready(Ok(()));
};
let client = Arc::clone(&consumer.client);
let fut = Box::pin(async move {
submit_result(client, service_id, call_id, output).await
});
*state = State::ProcessingSubmission(fut);
}
State::ProcessingSubmission(future) => match future.as_mut().poll(cx) {
Poll::Ready(Ok(())) => {
*state = State::WaitingForResult;
}
Poll::Ready(Err(e)) => {
*state = State::WaitingForResult;
return Poll::Ready(Err(e.into()));
}
Poll::Pending => return Poll::Pending,
},
}
}
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let buffer = self.buffer.lock().unwrap();
if buffer.is_empty() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
}
async fn submit_result(
client: Arc<TangleClient>,
service_id: u64,
call_id: u64,
output: Bytes,
) -> Result<(), ConsumerError> {
blueprint_core::debug!(
target: "tangle-consumer",
"Submitting result for service {} call {}",
service_id,
call_id
);
if client.config.dry_run {
blueprint_core::info!(
target: "tangle-consumer",
"Dry run enabled; skipping on-chain result submission for service {} call {}",
service_id,
call_id
);
return Ok(());
}
let result = client
.submit_result(service_id, call_id, output)
.await
.map_err(|e| ConsumerError::Transaction(format!("Failed to submit result: {e}")))?;
if result.success {
blueprint_core::info!(
target: "tangle-consumer",
"Successfully submitted result for service {} call {}: tx_hash={:?}",
service_id,
call_id,
result.tx_hash
);
Ok(())
} else {
Err(ConsumerError::Transaction(format!(
"Transaction reverted for service {} call {}: tx_hash={:?}",
service_id, call_id, result.tx_hash
)))
}
}