fuel_core_shared_sequencer/
service.rs

1//! Defines the logic how to interact with the shared sequencer.
2
3use crate::{
4    Client,
5    Config,
6    http_api::AccountMetadata,
7    ports::{
8        BlocksProvider,
9        Signer,
10    },
11};
12use async_trait::async_trait;
13use core::time::Duration;
14use fuel_core_services::{
15    EmptyShared,
16    RunnableService,
17    RunnableTask,
18    ServiceRunner,
19    StateWatcher,
20    TaskNextAction,
21    stream::BoxStream,
22};
23use fuel_core_types::services::{
24    block_importer::SharedImportResult,
25    shared_sequencer::{
26        SSBlob,
27        SSBlobs,
28    },
29};
30use futures::StreamExt;
31use std::sync::Arc;
32
33/// Non-initialized shared sequencer task.
34pub struct NonInitializedTask<S> {
35    config: Config,
36    signer: Arc<S>,
37    blocks_events: BoxStream<SharedImportResult>,
38}
39
40/// Initialized shared sequencer task.
41pub struct Task<S> {
42    /// The client that communicates with shared sequencer.
43    shared_sequencer_client: Option<Client>,
44    config: Config,
45    signer: Arc<S>,
46    account_metadata: Option<AccountMetadata>,
47    prev_order: Option<u64>,
48    blobs: Arc<tokio::sync::Mutex<SSBlobs>>,
49    interval: tokio::time::Interval,
50}
51
52impl<S> NonInitializedTask<S> {
53    /// Create a new shared sequencer task.
54    fn new(
55        config: Config,
56        blocks_events: BoxStream<SharedImportResult>,
57        signer: Arc<S>,
58    ) -> anyhow::Result<Self> {
59        if config.enabled && config.endpoints.is_none() {
60            return Err(anyhow::anyhow!(
61                "Shared sequencer is enabled but no endpoints are set"
62            ));
63        }
64
65        Ok(Self {
66            config,
67            blocks_events,
68            signer,
69        })
70    }
71}
72
73#[async_trait]
74impl<S> RunnableService for NonInitializedTask<S>
75where
76    S: Signer + 'static,
77{
78    const NAME: &'static str = "SharedSequencer";
79
80    type SharedData = EmptyShared;
81    type Task = Task<S>;
82    type TaskParams = ();
83
84    fn shared_data(&self) -> Self::SharedData {
85        EmptyShared
86    }
87
88    async fn into_task(
89        mut self,
90        _: &StateWatcher,
91        _: Self::TaskParams,
92    ) -> anyhow::Result<Self::Task> {
93        let shared_sequencer_client = match &self.config.endpoints {
94            Some(endpoints) => {
95                let ss = Client::new(endpoints.clone(), self.config.topic).await?;
96
97                if self.signer.is_available() {
98                    let cosmos_public_address =
99                        ss.sender_account_id(self.signer.as_ref())?;
100
101                    tracing::info!(
102                        "Shared sequencer uses account ID: {}",
103                        cosmos_public_address
104                    );
105                }
106
107                Some(ss)
108            }
109            _ => None,
110        };
111
112        let blobs = Arc::new(tokio::sync::Mutex::new(SSBlobs::new()));
113
114        if self.config.enabled {
115            let mut block_events = self.blocks_events;
116
117            tokio::task::spawn({
118                let blobs = blobs.clone();
119                async move {
120                    while let Some(block) = block_events.next().await {
121                        let blob = SSBlob {
122                            block_height: *block.sealed_block.entity.header().height(),
123                            block_id: block.sealed_block.entity.id(),
124                        };
125                        blobs.lock().await.push(blob);
126                    }
127                }
128            });
129        }
130
131        Ok(Task {
132            interval: tokio::time::interval(self.config.block_posting_frequency),
133            shared_sequencer_client,
134            config: self.config,
135            signer: self.signer,
136            account_metadata: None,
137            prev_order: None,
138            blobs,
139        })
140    }
141}
142
143impl<S> Task<S>
144where
145    S: Signer,
146{
147    /// Fetch latest account metadata if it's not set
148    async fn ensure_account_metadata(&mut self) -> anyhow::Result<()> {
149        if self.account_metadata.is_some() {
150            return Ok(());
151        }
152        let ss = self
153            .shared_sequencer_client
154            .as_ref()
155            .expect("Shared sequencer client is not set");
156        self.account_metadata = Some(ss.get_account_meta(self.signer.as_ref()).await?);
157        Ok(())
158    }
159
160    /// Fetch previous order in the topic if it's not set
161    async fn ensure_prev_order(&mut self) -> anyhow::Result<()> {
162        if self.prev_order.is_some() {
163            return Ok(());
164        }
165        let ss = self
166            .shared_sequencer_client
167            .as_ref()
168            .expect("Shared sequencer client is not set");
169        self.prev_order = ss.get_topic().await?.map(|f| f.order);
170        Ok(())
171    }
172}
173
174impl<S> RunnableTask for Task<S>
175where
176    S: Signer + 'static,
177{
178    async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
179        if !self.config.enabled {
180            let _ = watcher.while_started().await;
181            return TaskNextAction::Stop;
182        }
183
184        if let Err(err) = self.ensure_account_metadata().await {
185            // We don't want to spam the RPC endpoint with a lot of queries,
186            // so wait for one second before sending the next one.
187            tokio::time::sleep(Duration::from_secs(1)).await;
188            return TaskNextAction::ErrorContinue(err)
189        }
190        if let Err(err) = self.ensure_prev_order().await {
191            return TaskNextAction::ErrorContinue(err)
192        };
193
194        tokio::select! {
195            biased;
196            _ = watcher.while_started() => {
197                TaskNextAction::Stop
198            },
199            _ = self.interval.tick() => {
200                let blobs = {
201                    let mut lock = self.blobs.lock().await;
202                    core::mem::take(&mut *lock)
203                };
204                if blobs.is_empty() {
205                    tokio::time::sleep(Duration::from_secs(1)).await;
206                    return TaskNextAction::Continue;
207                };
208
209                let mut account = self.account_metadata.take().expect("Account metadata is not set");
210                let next_order = self.prev_order.map(|prev| prev.wrapping_add(1)).unwrap_or(0);
211                let ss =  self.shared_sequencer_client
212                    .as_ref().expect("Shared sequencer client is not set");
213                let blobs_bytes = postcard::to_allocvec(&blobs).expect("Failed to serialize SSBlob");
214
215                if let Err(err) = ss.send(self.signer.as_ref(), account, next_order, blobs_bytes).await {
216                    return TaskNextAction::ErrorContinue(err);
217                }
218
219                tracing::info!("Posted block to shared sequencer {blobs:?}");
220                account.sequence = account.sequence.saturating_add(1);
221                self.prev_order = Some(next_order);
222                self.account_metadata = Some(account);
223                TaskNextAction::Continue
224            },
225        }
226    }
227
228    async fn shutdown(self) -> anyhow::Result<()> {
229        // Nothing to shut down because we don't have any temporary state that should be dumped,
230        // and we don't spawn any sub-tasks that we need to finish or await.
231        Ok(())
232    }
233}
234
235/// Creates an instance of runnable shared sequencer service.
236pub fn new_service<B, S>(
237    block_provider: B,
238    config: Config,
239    signer: Arc<S>,
240) -> anyhow::Result<ServiceRunner<NonInitializedTask<S>>>
241where
242    B: BlocksProvider,
243    S: Signer,
244{
245    let blocks_events = block_provider.subscribe();
246    Ok(ServiceRunner::new(NonInitializedTask::new(
247        config,
248        blocks_events,
249        signer,
250    )?))
251}