fuel_core_shared_sequencer/
service.rs1use crate::{
4 http_api::AccountMetadata,
5 ports::{
6 BlocksProvider,
7 Signer,
8 },
9 Client,
10 Config,
11};
12use async_trait::async_trait;
13use core::time::Duration;
14use fuel_core_services::{
15 stream::BoxStream,
16 EmptyShared,
17 RunnableService,
18 RunnableTask,
19 ServiceRunner,
20 StateWatcher,
21 TaskNextAction,
22};
23use fuel_core_types::services::{
24 block_importer::SharedImportResult,
25 shared_sequencer::{
26 SSBlob,
27 SSBlobs,
28 },
29};
30use futures::StreamExt;
31use std::sync::Arc;
32
33pub struct NonInitializedTask<S> {
35 config: Config,
36 signer: Arc<S>,
37 blocks_events: BoxStream<SharedImportResult>,
38}
39
40pub struct Task<S> {
42 shared_sequencer_client: Option<Client>,
44 config: Config,
45 signer: Arc<S>,
46 account_metadata: Option<AccountMetadata>,
47 prev_order: Option<u64>,
48 blobs: Arc<tokio::sync::Mutex<SSBlobs>>,
49 interval: tokio::time::Interval,
50}
51
52impl<S> NonInitializedTask<S> {
53 fn new(
55 config: Config,
56 blocks_events: BoxStream<SharedImportResult>,
57 signer: Arc<S>,
58 ) -> anyhow::Result<Self> {
59 if config.enabled && config.endpoints.is_none() {
60 return Err(anyhow::anyhow!(
61 "Shared sequencer is enabled but no endpoints are set"
62 ));
63 }
64
65 Ok(Self {
66 config,
67 blocks_events,
68 signer,
69 })
70 }
71}
72
73#[async_trait]
74impl<S> RunnableService for NonInitializedTask<S>
75where
76 S: Signer + 'static,
77{
78 const NAME: &'static str = "SharedSequencer";
79
80 type SharedData = EmptyShared;
81 type Task = Task<S>;
82 type TaskParams = ();
83
84 fn shared_data(&self) -> Self::SharedData {
85 EmptyShared
86 }
87
88 async fn into_task(
89 mut self,
90 _: &StateWatcher,
91 _: Self::TaskParams,
92 ) -> anyhow::Result<Self::Task> {
93 let shared_sequencer_client = if let Some(endpoints) = &self.config.endpoints {
94 let ss = Client::new(endpoints.clone(), self.config.topic).await?;
95
96 if self.signer.is_available() {
97 let cosmos_public_address = ss.sender_account_id(self.signer.as_ref())?;
98
99 tracing::info!(
100 "Shared sequencer uses account ID: {}",
101 cosmos_public_address
102 );
103 }
104
105 Some(ss)
106 } else {
107 None
108 };
109
110 let blobs = Arc::new(tokio::sync::Mutex::new(SSBlobs::new()));
111
112 if self.config.enabled {
113 let mut block_events = self.blocks_events;
114
115 tokio::task::spawn({
116 let blobs = blobs.clone();
117 async move {
118 while let Some(block) = block_events.next().await {
119 let blob = SSBlob {
120 block_height: *block.sealed_block.entity.header().height(),
121 block_id: block.sealed_block.entity.id(),
122 };
123 blobs.lock().await.push(blob);
124 }
125 }
126 });
127 }
128
129 Ok(Task {
130 interval: tokio::time::interval(self.config.block_posting_frequency),
131 shared_sequencer_client,
132 config: self.config,
133 signer: self.signer,
134 account_metadata: None,
135 prev_order: None,
136 blobs,
137 })
138 }
139}
140
141impl<S> Task<S>
142where
143 S: Signer,
144{
145 async fn ensure_account_metadata(&mut self) -> anyhow::Result<()> {
147 if self.account_metadata.is_some() {
148 return Ok(());
149 }
150 let ss = self
151 .shared_sequencer_client
152 .as_ref()
153 .expect("Shared sequencer client is not set");
154 self.account_metadata = Some(ss.get_account_meta(self.signer.as_ref()).await?);
155 Ok(())
156 }
157
158 async fn ensure_prev_order(&mut self) -> anyhow::Result<()> {
160 if self.prev_order.is_some() {
161 return Ok(());
162 }
163 let ss = self
164 .shared_sequencer_client
165 .as_ref()
166 .expect("Shared sequencer client is not set");
167 self.prev_order = ss.get_topic().await?.map(|f| f.order);
168 Ok(())
169 }
170}
171
172impl<S> RunnableTask for Task<S>
173where
174 S: Signer + 'static,
175{
176 async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
177 if !self.config.enabled {
178 let _ = watcher.while_started().await;
179 return TaskNextAction::Stop;
180 }
181
182 if let Err(err) = self.ensure_account_metadata().await {
183 tokio::time::sleep(Duration::from_secs(1)).await;
186 return TaskNextAction::ErrorContinue(err)
187 }
188 if let Err(err) = self.ensure_prev_order().await {
189 return TaskNextAction::ErrorContinue(err)
190 };
191
192 tokio::select! {
193 biased;
194 _ = watcher.while_started() => {
195 TaskNextAction::Stop
196 },
197 _ = self.interval.tick() => {
198 let blobs = {
199 let mut lock = self.blobs.lock().await;
200 core::mem::take(&mut *lock)
201 };
202 if blobs.is_empty() {
203 tokio::time::sleep(Duration::from_secs(1)).await;
204 return TaskNextAction::Continue;
205 };
206
207 let mut account = self.account_metadata.take().expect("Account metadata is not set");
208 let next_order = self.prev_order.map(|prev| prev.wrapping_add(1)).unwrap_or(0);
209 let ss = self.shared_sequencer_client
210 .as_ref().expect("Shared sequencer client is not set");
211 let blobs_bytes = postcard::to_allocvec(&blobs).expect("Failed to serialize SSBlob");
212
213 if let Err(err) = ss.send(self.signer.as_ref(), account, next_order, blobs_bytes).await {
214 return TaskNextAction::ErrorContinue(err);
215 }
216
217 tracing::info!("Posted block to shared sequencer {blobs:?}");
218 account.sequence = account.sequence.saturating_add(1);
219 self.prev_order = Some(next_order);
220 self.account_metadata = Some(account);
221 TaskNextAction::Continue
222 },
223 }
224 }
225
226 async fn shutdown(self) -> anyhow::Result<()> {
227 Ok(())
230 }
231}
232
233pub fn new_service<B, S>(
235 block_provider: B,
236 config: Config,
237 signer: Arc<S>,
238) -> anyhow::Result<ServiceRunner<NonInitializedTask<S>>>
239where
240 B: BlocksProvider,
241 S: Signer,
242{
243 let blocks_events = block_provider.subscribe();
244 Ok(ServiceRunner::new(NonInitializedTask::new(
245 config,
246 blocks_events,
247 signer,
248 )?))
249}