blueprint_tangle_extra/
consumer.rs1use crate::extract;
6use alloy_primitives::Bytes;
7use blueprint_client_tangle::TangleClient;
8use blueprint_core::JobResult;
9use blueprint_core::error::BoxError;
10use blueprint_std::boxed::Box;
11use blueprint_std::collections::VecDeque;
12use blueprint_std::format;
13use blueprint_std::string::String;
14use blueprint_std::sync::{Arc, Mutex};
15use core::pin::Pin;
16use core::task::{Context, Poll};
17use futures_util::Sink;
18
19#[derive(Debug, thiserror::Error)]
21pub enum ConsumerError {
22 #[error("Client error: {0}")]
24 Client(String),
25 #[error("Missing metadata: {0}")]
27 MissingMetadata(&'static str),
28 #[error("Invalid metadata: {0}")]
30 InvalidMetadata(&'static str),
31 #[error("Transaction error: {0}")]
33 Transaction(String),
34}
35
36struct DerivedJobResult {
38 service_id: u64,
39 call_id: u64,
40 output: Bytes,
41}
42
43enum State {
44 WaitingForResult,
45 ProcessingSubmission(
46 Pin<Box<dyn core::future::Future<Output = Result<(), ConsumerError>> + Send>>,
47 ),
48}
49
50impl State {
51 fn is_waiting(&self) -> bool {
52 matches!(self, State::WaitingForResult)
53 }
54}
55
56pub struct TangleConsumer {
58 client: Arc<TangleClient>,
59 buffer: Mutex<VecDeque<DerivedJobResult>>,
60 state: Mutex<State>,
61}
62
63impl TangleConsumer {
64 pub fn new(client: TangleClient) -> Self {
66 Self {
67 client: Arc::new(client),
68 buffer: Mutex::new(VecDeque::new()),
69 state: Mutex::new(State::WaitingForResult),
70 }
71 }
72
73 #[must_use]
75 pub fn client(&self) -> &TangleClient {
76 &self.client
77 }
78}
79
80impl Sink<JobResult> for TangleConsumer {
81 type Error = BoxError;
82
83 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
84 Poll::Ready(Ok(()))
85 }
86
87 fn start_send(self: Pin<&mut Self>, item: JobResult) -> Result<(), Self::Error> {
88 let JobResult::Ok { head, body } = &item else {
89 blueprint_core::trace!(target: "tangle-consumer", "Discarding job result with error");
91 return Ok(());
92 };
93
94 let (Some(call_id_raw), Some(service_id_raw)) = (
95 head.metadata.get(extract::CallId::METADATA_KEY),
96 head.metadata.get(extract::ServiceId::METADATA_KEY),
97 ) else {
98 blueprint_core::trace!(target: "tangle-consumer", "Discarding job result with missing metadata");
100 return Ok(());
101 };
102
103 blueprint_core::debug!(target: "tangle-consumer", result = ?item, "Received job result, handling...");
104
105 let call_id: u64 = call_id_raw
106 .try_into()
107 .map_err(|_| ConsumerError::InvalidMetadata("call_id"))?;
108 let service_id: u64 = service_id_raw
109 .try_into()
110 .map_err(|_| ConsumerError::InvalidMetadata("service_id"))?;
111
112 self.get_mut()
113 .buffer
114 .lock()
115 .unwrap()
116 .push_back(DerivedJobResult {
117 service_id,
118 call_id,
119 output: Bytes::copy_from_slice(body),
120 });
121 Ok(())
122 }
123
124 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
125 let consumer = self.get_mut();
126 let mut state = consumer.state.lock().unwrap();
127
128 {
129 let buffer = consumer.buffer.lock().unwrap();
130 if buffer.is_empty() && state.is_waiting() {
131 return Poll::Ready(Ok(()));
132 }
133 }
134
135 loop {
136 match &mut *state {
137 State::WaitingForResult => {
138 let result = {
139 let mut buffer = consumer.buffer.lock().unwrap();
140 buffer.pop_front()
141 };
142
143 let Some(DerivedJobResult {
144 service_id,
145 call_id,
146 output,
147 }) = result
148 else {
149 return Poll::Ready(Ok(()));
150 };
151
152 let client = Arc::clone(&consumer.client);
153 let fut = Box::pin(async move {
154 submit_result(client, service_id, call_id, output).await
155 });
156
157 *state = State::ProcessingSubmission(fut);
158 }
159 State::ProcessingSubmission(future) => match future.as_mut().poll(cx) {
160 Poll::Ready(Ok(())) => {
161 *state = State::WaitingForResult;
162 }
163 Poll::Ready(Err(e)) => {
164 *state = State::WaitingForResult;
165 return Poll::Ready(Err(e.into()));
166 }
167 Poll::Pending => return Poll::Pending,
168 },
169 }
170 }
171 }
172
173 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
174 let buffer = self.buffer.lock().unwrap();
175 if buffer.is_empty() {
176 Poll::Ready(Ok(()))
177 } else {
178 Poll::Pending
179 }
180 }
181}
182
183async fn submit_result(
185 client: Arc<TangleClient>,
186 service_id: u64,
187 call_id: u64,
188 output: Bytes,
189) -> Result<(), ConsumerError> {
190 blueprint_core::debug!(
191 target: "tangle-consumer",
192 "Submitting result for service {} call {}",
193 service_id,
194 call_id
195 );
196
197 if client.config.dry_run {
198 blueprint_core::info!(
199 target: "tangle-consumer",
200 "Dry run enabled; skipping on-chain result submission for service {} call {}",
201 service_id,
202 call_id
203 );
204 return Ok(());
205 }
206
207 let result = client
208 .submit_result(service_id, call_id, output)
209 .await
210 .map_err(|e| ConsumerError::Transaction(format!("Failed to submit result: {e}")))?;
211
212 if result.success {
213 blueprint_core::info!(
214 target: "tangle-consumer",
215 "Successfully submitted result for service {} call {}: tx_hash={:?}",
216 service_id,
217 call_id,
218 result.tx_hash
219 );
220 Ok(())
221 } else {
222 Err(ConsumerError::Transaction(format!(
223 "Transaction reverted for service {} call {}: tx_hash={:?}",
224 service_id, call_id, result.tx_hash
225 )))
226 }
227}