blueprint-tangle-extra 0.2.0-alpha.2

Producer/Consumer extras for Tangle blueprints
Documentation
//! Tangle Consumer
//!
//! Consumes [`JobResult`]s and submits them to the Tangle contract.

use crate::extract;
use alloy_primitives::Bytes;
use blueprint_client_tangle::TangleClient;
use blueprint_core::JobResult;
use blueprint_core::error::BoxError;
use blueprint_std::boxed::Box;
use blueprint_std::collections::VecDeque;
use blueprint_std::format;
use blueprint_std::string::String;
use blueprint_std::sync::{Arc, Mutex};
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_util::Sink;

/// Error type for the consumer
#[derive(Debug, thiserror::Error)]
pub enum ConsumerError {
    /// Client error
    #[error("Client error: {0}")]
    Client(String),
    /// Missing metadata
    #[error("Missing metadata: {0}")]
    MissingMetadata(&'static str),
    /// Invalid metadata
    #[error("Invalid metadata: {0}")]
    InvalidMetadata(&'static str),
    /// Transaction error
    #[error("Transaction error: {0}")]
    Transaction(String),
}

/// Derived job result for submission
struct DerivedJobResult {
    service_id: u64,
    call_id: u64,
    output: Bytes,
}

enum State {
    WaitingForResult,
    ProcessingSubmission(
        Pin<Box<dyn core::future::Future<Output = Result<(), ConsumerError>> + Send>>,
    ),
}

impl State {
    fn is_waiting(&self) -> bool {
        matches!(self, State::WaitingForResult)
    }
}

/// A consumer of Tangle [`JobResult`]s
pub struct TangleConsumer {
    client: Arc<TangleClient>,
    buffer: Mutex<VecDeque<DerivedJobResult>>,
    state: Mutex<State>,
}

impl TangleConsumer {
    /// Create a new [`TangleConsumer`]
    pub fn new(client: TangleClient) -> Self {
        Self {
            client: Arc::new(client),
            buffer: Mutex::new(VecDeque::new()),
            state: Mutex::new(State::WaitingForResult),
        }
    }

    /// Get the client
    #[must_use]
    pub fn client(&self) -> &TangleClient {
        &self.client
    }
}

impl Sink<JobResult> for TangleConsumer {
    type Error = BoxError;

    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn start_send(self: Pin<&mut Self>, item: JobResult) -> Result<(), Self::Error> {
        let JobResult::Ok { head, body } = &item else {
            // We don't care about errors here
            blueprint_core::trace!(target: "tangle-consumer", "Discarding job result with error");
            return Ok(());
        };

        let (Some(call_id_raw), Some(service_id_raw)) = (
            head.metadata.get(extract::CallId::METADATA_KEY),
            head.metadata.get(extract::ServiceId::METADATA_KEY),
        ) else {
            // Not a tangle job result
            blueprint_core::trace!(target: "tangle-consumer", "Discarding job result with missing metadata");
            return Ok(());
        };

        blueprint_core::debug!(target: "tangle-consumer", result = ?item, "Received job result, handling...");

        let call_id: u64 = call_id_raw
            .try_into()
            .map_err(|_| ConsumerError::InvalidMetadata("call_id"))?;
        let service_id: u64 = service_id_raw
            .try_into()
            .map_err(|_| ConsumerError::InvalidMetadata("service_id"))?;

        self.get_mut()
            .buffer
            .lock()
            .unwrap()
            .push_back(DerivedJobResult {
                service_id,
                call_id,
                output: Bytes::copy_from_slice(body),
            });
        Ok(())
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        let consumer = self.get_mut();
        let mut state = consumer.state.lock().unwrap();

        {
            let buffer = consumer.buffer.lock().unwrap();
            if buffer.is_empty() && state.is_waiting() {
                return Poll::Ready(Ok(()));
            }
        }

        loop {
            match &mut *state {
                State::WaitingForResult => {
                    let result = {
                        let mut buffer = consumer.buffer.lock().unwrap();
                        buffer.pop_front()
                    };

                    let Some(DerivedJobResult {
                        service_id,
                        call_id,
                        output,
                    }) = result
                    else {
                        return Poll::Ready(Ok(()));
                    };

                    let client = Arc::clone(&consumer.client);
                    let fut = Box::pin(async move {
                        submit_result(client, service_id, call_id, output).await
                    });

                    *state = State::ProcessingSubmission(fut);
                }
                State::ProcessingSubmission(future) => match future.as_mut().poll(cx) {
                    Poll::Ready(Ok(())) => {
                        *state = State::WaitingForResult;
                    }
                    Poll::Ready(Err(e)) => {
                        *state = State::WaitingForResult;
                        return Poll::Ready(Err(e.into()));
                    }
                    Poll::Pending => return Poll::Pending,
                },
            }
        }
    }

    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        let buffer = self.buffer.lock().unwrap();
        if buffer.is_empty() {
            Poll::Ready(Ok(()))
        } else {
            Poll::Pending
        }
    }
}

/// Submit a result to the Tangle contract
async fn submit_result(
    client: Arc<TangleClient>,
    service_id: u64,
    call_id: u64,
    output: Bytes,
) -> Result<(), ConsumerError> {
    blueprint_core::debug!(
        target: "tangle-consumer",
        "Submitting result for service {} call {}",
        service_id,
        call_id
    );

    if client.config.dry_run {
        blueprint_core::info!(
            target: "tangle-consumer",
            "Dry run enabled; skipping on-chain result submission for service {} call {}",
            service_id,
            call_id
        );
        return Ok(());
    }

    let result = client
        .submit_result(service_id, call_id, output)
        .await
        .map_err(|e| ConsumerError::Transaction(format!("Failed to submit result: {e}")))?;

    if result.success {
        blueprint_core::info!(
            target: "tangle-consumer",
            "Successfully submitted result for service {} call {}: tx_hash={:?}",
            service_id,
            call_id,
            result.tx_hash
        );
        Ok(())
    } else {
        Err(ConsumerError::Transaction(format!(
            "Transaction reverted for service {} call {}: tx_hash={:?}",
            service_id, call_id, result.tx_hash
        )))
    }
}