fuel_core_shared_sequencer/
service.rs1use crate::{
4 Client,
5 Config,
6 http_api::AccountMetadata,
7 ports::{
8 BlocksProvider,
9 Signer,
10 },
11};
12use async_trait::async_trait;
13use core::time::Duration;
14use fuel_core_services::{
15 EmptyShared,
16 RunnableService,
17 RunnableTask,
18 ServiceRunner,
19 StateWatcher,
20 TaskNextAction,
21 stream::BoxStream,
22};
23use fuel_core_types::services::{
24 block_importer::SharedImportResult,
25 shared_sequencer::{
26 SSBlob,
27 SSBlobs,
28 },
29};
30use futures::StreamExt;
31use std::sync::Arc;
32
33pub struct NonInitializedTask<S> {
35 config: Config,
36 signer: Arc<S>,
37 blocks_events: BoxStream<SharedImportResult>,
38}
39
40pub struct Task<S> {
42 shared_sequencer_client: Option<Client>,
44 config: Config,
45 signer: Arc<S>,
46 account_metadata: Option<AccountMetadata>,
47 prev_order: Option<u64>,
48 blobs: Arc<tokio::sync::Mutex<SSBlobs>>,
49 interval: tokio::time::Interval,
50}
51
52impl<S> NonInitializedTask<S> {
53 fn new(
55 config: Config,
56 blocks_events: BoxStream<SharedImportResult>,
57 signer: Arc<S>,
58 ) -> anyhow::Result<Self> {
59 if config.enabled && config.endpoints.is_none() {
60 return Err(anyhow::anyhow!(
61 "Shared sequencer is enabled but no endpoints are set"
62 ));
63 }
64
65 Ok(Self {
66 config,
67 blocks_events,
68 signer,
69 })
70 }
71}
72
73#[async_trait]
74impl<S> RunnableService for NonInitializedTask<S>
75where
76 S: Signer + 'static,
77{
78 const NAME: &'static str = "SharedSequencer";
79
80 type SharedData = EmptyShared;
81 type Task = Task<S>;
82 type TaskParams = ();
83
84 fn shared_data(&self) -> Self::SharedData {
85 EmptyShared
86 }
87
88 async fn into_task(
89 mut self,
90 _: &StateWatcher,
91 _: Self::TaskParams,
92 ) -> anyhow::Result<Self::Task> {
93 let shared_sequencer_client = match &self.config.endpoints {
94 Some(endpoints) => {
95 let ss = Client::new(endpoints.clone(), self.config.topic).await?;
96
97 if self.signer.is_available() {
98 let cosmos_public_address =
99 ss.sender_account_id(self.signer.as_ref())?;
100
101 tracing::info!(
102 "Shared sequencer uses account ID: {}",
103 cosmos_public_address
104 );
105 }
106
107 Some(ss)
108 }
109 _ => None,
110 };
111
112 let blobs = Arc::new(tokio::sync::Mutex::new(SSBlobs::new()));
113
114 if self.config.enabled {
115 let mut block_events = self.blocks_events;
116
117 tokio::task::spawn({
118 let blobs = blobs.clone();
119 async move {
120 while let Some(block) = block_events.next().await {
121 let blob = SSBlob {
122 block_height: *block.sealed_block.entity.header().height(),
123 block_id: block.sealed_block.entity.id(),
124 };
125 blobs.lock().await.push(blob);
126 }
127 }
128 });
129 }
130
131 Ok(Task {
132 interval: tokio::time::interval(self.config.block_posting_frequency),
133 shared_sequencer_client,
134 config: self.config,
135 signer: self.signer,
136 account_metadata: None,
137 prev_order: None,
138 blobs,
139 })
140 }
141}
142
143impl<S> Task<S>
144where
145 S: Signer,
146{
147 async fn ensure_account_metadata(&mut self) -> anyhow::Result<()> {
149 if self.account_metadata.is_some() {
150 return Ok(());
151 }
152 let ss = self
153 .shared_sequencer_client
154 .as_ref()
155 .expect("Shared sequencer client is not set");
156 self.account_metadata = Some(ss.get_account_meta(self.signer.as_ref()).await?);
157 Ok(())
158 }
159
160 async fn ensure_prev_order(&mut self) -> anyhow::Result<()> {
162 if self.prev_order.is_some() {
163 return Ok(());
164 }
165 let ss = self
166 .shared_sequencer_client
167 .as_ref()
168 .expect("Shared sequencer client is not set");
169 self.prev_order = ss.get_topic().await?.map(|f| f.order);
170 Ok(())
171 }
172}
173
174impl<S> RunnableTask for Task<S>
175where
176 S: Signer + 'static,
177{
178 async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
179 if !self.config.enabled {
180 let _ = watcher.while_started().await;
181 return TaskNextAction::Stop;
182 }
183
184 if let Err(err) = self.ensure_account_metadata().await {
185 tokio::time::sleep(Duration::from_secs(1)).await;
188 return TaskNextAction::ErrorContinue(err)
189 }
190 if let Err(err) = self.ensure_prev_order().await {
191 return TaskNextAction::ErrorContinue(err)
192 };
193
194 tokio::select! {
195 biased;
196 _ = watcher.while_started() => {
197 TaskNextAction::Stop
198 },
199 _ = self.interval.tick() => {
200 let blobs = {
201 let mut lock = self.blobs.lock().await;
202 core::mem::take(&mut *lock)
203 };
204 if blobs.is_empty() {
205 tokio::time::sleep(Duration::from_secs(1)).await;
206 return TaskNextAction::Continue;
207 };
208
209 let mut account = self.account_metadata.take().expect("Account metadata is not set");
210 let next_order = self.prev_order.map(|prev| prev.wrapping_add(1)).unwrap_or(0);
211 let ss = self.shared_sequencer_client
212 .as_ref().expect("Shared sequencer client is not set");
213 let blobs_bytes = postcard::to_allocvec(&blobs).expect("Failed to serialize SSBlob");
214
215 if let Err(err) = ss.send(self.signer.as_ref(), account, next_order, blobs_bytes).await {
216 return TaskNextAction::ErrorContinue(err);
217 }
218
219 tracing::info!("Posted block to shared sequencer {blobs:?}");
220 account.sequence = account.sequence.saturating_add(1);
221 self.prev_order = Some(next_order);
222 self.account_metadata = Some(account);
223 TaskNextAction::Continue
224 },
225 }
226 }
227
228 async fn shutdown(self) -> anyhow::Result<()> {
229 Ok(())
232 }
233}
234
235pub fn new_service<B, S>(
237 block_provider: B,
238 config: Config,
239 signer: Arc<S>,
240) -> anyhow::Result<ServiceRunner<NonInitializedTask<S>>>
241where
242 B: BlocksProvider,
243 S: Signer,
244{
245 let blocks_events = block_provider.subscribe();
246 Ok(ServiceRunner::new(NonInitializedTask::new(
247 config,
248 blocks_events,
249 signer,
250 )?))
251}