Skip to main content

blueprint_tangle_extra/
consumer.rs

1//! Tangle Consumer
2//!
3//! Consumes [`JobResult`]s and submits them to the Tangle contract.
4
5use crate::extract;
6use alloy_primitives::Bytes;
7use blueprint_client_tangle::TangleClient;
8use blueprint_core::JobResult;
9use blueprint_core::error::BoxError;
10use blueprint_std::boxed::Box;
11use blueprint_std::collections::VecDeque;
12use blueprint_std::format;
13use blueprint_std::string::String;
14use blueprint_std::sync::{Arc, Mutex};
15use core::pin::Pin;
16use core::task::{Context, Poll};
17use futures_util::Sink;
18
19/// Error type for the consumer
20#[derive(Debug, thiserror::Error)]
21pub enum ConsumerError {
22    /// Client error
23    #[error("Client error: {0}")]
24    Client(String),
25    /// Missing metadata
26    #[error("Missing metadata: {0}")]
27    MissingMetadata(&'static str),
28    /// Invalid metadata
29    #[error("Invalid metadata: {0}")]
30    InvalidMetadata(&'static str),
31    /// Transaction error
32    #[error("Transaction error: {0}")]
33    Transaction(String),
34}
35
36/// Derived job result for submission
37struct DerivedJobResult {
38    service_id: u64,
39    call_id: u64,
40    output: Bytes,
41}
42
43enum State {
44    WaitingForResult,
45    ProcessingSubmission(
46        Pin<Box<dyn core::future::Future<Output = Result<(), ConsumerError>> + Send>>,
47    ),
48}
49
50impl State {
51    fn is_waiting(&self) -> bool {
52        matches!(self, State::WaitingForResult)
53    }
54}
55
56/// A consumer of Tangle [`JobResult`]s
57pub struct TangleConsumer {
58    client: Arc<TangleClient>,
59    buffer: Mutex<VecDeque<DerivedJobResult>>,
60    state: Mutex<State>,
61}
62
63impl TangleConsumer {
64    /// Create a new [`TangleConsumer`]
65    pub fn new(client: TangleClient) -> Self {
66        Self {
67            client: Arc::new(client),
68            buffer: Mutex::new(VecDeque::new()),
69            state: Mutex::new(State::WaitingForResult),
70        }
71    }
72
73    /// Get the client
74    #[must_use]
75    pub fn client(&self) -> &TangleClient {
76        &self.client
77    }
78}
79
80impl Sink<JobResult> for TangleConsumer {
81    type Error = BoxError;
82
83    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
84        Poll::Ready(Ok(()))
85    }
86
87    fn start_send(self: Pin<&mut Self>, item: JobResult) -> Result<(), Self::Error> {
88        let JobResult::Ok { head, body } = &item else {
89            // We don't care about errors here
90            blueprint_core::trace!(target: "tangle-consumer", "Discarding job result with error");
91            return Ok(());
92        };
93
94        let (Some(call_id_raw), Some(service_id_raw)) = (
95            head.metadata.get(extract::CallId::METADATA_KEY),
96            head.metadata.get(extract::ServiceId::METADATA_KEY),
97        ) else {
98            // Not a tangle job result
99            blueprint_core::trace!(target: "tangle-consumer", "Discarding job result with missing metadata");
100            return Ok(());
101        };
102
103        blueprint_core::debug!(target: "tangle-consumer", result = ?item, "Received job result, handling...");
104
105        let call_id: u64 = call_id_raw
106            .try_into()
107            .map_err(|_| ConsumerError::InvalidMetadata("call_id"))?;
108        let service_id: u64 = service_id_raw
109            .try_into()
110            .map_err(|_| ConsumerError::InvalidMetadata("service_id"))?;
111
112        self.get_mut()
113            .buffer
114            .lock()
115            .unwrap()
116            .push_back(DerivedJobResult {
117                service_id,
118                call_id,
119                output: Bytes::copy_from_slice(body),
120            });
121        Ok(())
122    }
123
124    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
125        let consumer = self.get_mut();
126        let mut state = consumer.state.lock().unwrap();
127
128        {
129            let buffer = consumer.buffer.lock().unwrap();
130            if buffer.is_empty() && state.is_waiting() {
131                return Poll::Ready(Ok(()));
132            }
133        }
134
135        loop {
136            match &mut *state {
137                State::WaitingForResult => {
138                    let result = {
139                        let mut buffer = consumer.buffer.lock().unwrap();
140                        buffer.pop_front()
141                    };
142
143                    let Some(DerivedJobResult {
144                        service_id,
145                        call_id,
146                        output,
147                    }) = result
148                    else {
149                        return Poll::Ready(Ok(()));
150                    };
151
152                    let client = Arc::clone(&consumer.client);
153                    let fut = Box::pin(async move {
154                        submit_result(client, service_id, call_id, output).await
155                    });
156
157                    *state = State::ProcessingSubmission(fut);
158                }
159                State::ProcessingSubmission(future) => match future.as_mut().poll(cx) {
160                    Poll::Ready(Ok(())) => {
161                        *state = State::WaitingForResult;
162                    }
163                    Poll::Ready(Err(e)) => {
164                        *state = State::WaitingForResult;
165                        return Poll::Ready(Err(e.into()));
166                    }
167                    Poll::Pending => return Poll::Pending,
168                },
169            }
170        }
171    }
172
173    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
174        let buffer = self.buffer.lock().unwrap();
175        if buffer.is_empty() {
176            Poll::Ready(Ok(()))
177        } else {
178            Poll::Pending
179        }
180    }
181}
182
183/// Submit a result to the Tangle contract
184async fn submit_result(
185    client: Arc<TangleClient>,
186    service_id: u64,
187    call_id: u64,
188    output: Bytes,
189) -> Result<(), ConsumerError> {
190    blueprint_core::debug!(
191        target: "tangle-consumer",
192        "Submitting result for service {} call {}",
193        service_id,
194        call_id
195    );
196
197    if client.config.dry_run {
198        blueprint_core::info!(
199            target: "tangle-consumer",
200            "Dry run enabled; skipping on-chain result submission for service {} call {}",
201            service_id,
202            call_id
203        );
204        return Ok(());
205    }
206
207    let result = client
208        .submit_result(service_id, call_id, output)
209        .await
210        .map_err(|e| ConsumerError::Transaction(format!("Failed to submit result: {e}")))?;
211
212    if result.success {
213        blueprint_core::info!(
214            target: "tangle-consumer",
215            "Successfully submitted result for service {} call {}: tx_hash={:?}",
216            service_id,
217            call_id,
218            result.tx_hash
219        );
220        Ok(())
221    } else {
222        Err(ConsumerError::Transaction(format!(
223            "Transaction reverted for service {} call {}: tx_hash={:?}",
224            service_id, call_id, result.tx_hash
225        )))
226    }
227}