fuel_core_shared_sequencer/
service.rs

1//! Defines the logic how to interact with the shared sequencer.
2
3use crate::{
4    http_api::AccountMetadata,
5    ports::{
6        BlocksProvider,
7        Signer,
8    },
9    Client,
10    Config,
11};
12use async_trait::async_trait;
13use core::time::Duration;
14use fuel_core_services::{
15    stream::BoxStream,
16    EmptyShared,
17    RunnableService,
18    RunnableTask,
19    ServiceRunner,
20    StateWatcher,
21    TaskNextAction,
22};
23use fuel_core_types::services::{
24    block_importer::SharedImportResult,
25    shared_sequencer::{
26        SSBlob,
27        SSBlobs,
28    },
29};
30use futures::StreamExt;
31use std::sync::Arc;
32
33/// Non-initialized shared sequencer task.
34pub struct NonInitializedTask<S> {
35    config: Config,
36    signer: Arc<S>,
37    blocks_events: BoxStream<SharedImportResult>,
38}
39
40/// Initialized shared sequencer task.
41pub struct Task<S> {
42    /// The client that communicates with shared sequencer.
43    shared_sequencer_client: Option<Client>,
44    config: Config,
45    signer: Arc<S>,
46    account_metadata: Option<AccountMetadata>,
47    prev_order: Option<u64>,
48    blobs: Arc<tokio::sync::Mutex<SSBlobs>>,
49    interval: tokio::time::Interval,
50}
51
52impl<S> NonInitializedTask<S> {
53    /// Create a new shared sequencer task.
54    fn new(
55        config: Config,
56        blocks_events: BoxStream<SharedImportResult>,
57        signer: Arc<S>,
58    ) -> anyhow::Result<Self> {
59        if config.enabled && config.endpoints.is_none() {
60            return Err(anyhow::anyhow!(
61                "Shared sequencer is enabled but no endpoints are set"
62            ));
63        }
64
65        Ok(Self {
66            config,
67            blocks_events,
68            signer,
69        })
70    }
71}
72
73#[async_trait]
74impl<S> RunnableService for NonInitializedTask<S>
75where
76    S: Signer + 'static,
77{
78    const NAME: &'static str = "SharedSequencer";
79
80    type SharedData = EmptyShared;
81    type Task = Task<S>;
82    type TaskParams = ();
83
84    fn shared_data(&self) -> Self::SharedData {
85        EmptyShared
86    }
87
88    async fn into_task(
89        mut self,
90        _: &StateWatcher,
91        _: Self::TaskParams,
92    ) -> anyhow::Result<Self::Task> {
93        let shared_sequencer_client = if let Some(endpoints) = &self.config.endpoints {
94            let ss = Client::new(endpoints.clone(), self.config.topic).await?;
95
96            if self.signer.is_available() {
97                let cosmos_public_address = ss.sender_account_id(self.signer.as_ref())?;
98
99                tracing::info!(
100                    "Shared sequencer uses account ID: {}",
101                    cosmos_public_address
102                );
103            }
104
105            Some(ss)
106        } else {
107            None
108        };
109
110        let blobs = Arc::new(tokio::sync::Mutex::new(SSBlobs::new()));
111
112        if self.config.enabled {
113            let mut block_events = self.blocks_events;
114
115            tokio::task::spawn({
116                let blobs = blobs.clone();
117                async move {
118                    while let Some(block) = block_events.next().await {
119                        let blob = SSBlob {
120                            block_height: *block.sealed_block.entity.header().height(),
121                            block_id: block.sealed_block.entity.id(),
122                        };
123                        blobs.lock().await.push(blob);
124                    }
125                }
126            });
127        }
128
129        Ok(Task {
130            interval: tokio::time::interval(self.config.block_posting_frequency),
131            shared_sequencer_client,
132            config: self.config,
133            signer: self.signer,
134            account_metadata: None,
135            prev_order: None,
136            blobs,
137        })
138    }
139}
140
141impl<S> Task<S>
142where
143    S: Signer,
144{
145    /// Fetch latest account metadata if it's not set
146    async fn ensure_account_metadata(&mut self) -> anyhow::Result<()> {
147        if self.account_metadata.is_some() {
148            return Ok(());
149        }
150        let ss = self
151            .shared_sequencer_client
152            .as_ref()
153            .expect("Shared sequencer client is not set");
154        self.account_metadata = Some(ss.get_account_meta(self.signer.as_ref()).await?);
155        Ok(())
156    }
157
158    /// Fetch previous order in the topic if it's not set
159    async fn ensure_prev_order(&mut self) -> anyhow::Result<()> {
160        if self.prev_order.is_some() {
161            return Ok(());
162        }
163        let ss = self
164            .shared_sequencer_client
165            .as_ref()
166            .expect("Shared sequencer client is not set");
167        self.prev_order = ss.get_topic().await?.map(|f| f.order);
168        Ok(())
169    }
170}
171
172impl<S> RunnableTask for Task<S>
173where
174    S: Signer + 'static,
175{
176    async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
177        if !self.config.enabled {
178            let _ = watcher.while_started().await;
179            return TaskNextAction::Stop;
180        }
181
182        if let Err(err) = self.ensure_account_metadata().await {
183            // We don't want to spam the RPC endpoint with a lot of queries,
184            // so wait for one second before sending the next one.
185            tokio::time::sleep(Duration::from_secs(1)).await;
186            return TaskNextAction::ErrorContinue(err)
187        }
188        if let Err(err) = self.ensure_prev_order().await {
189            return TaskNextAction::ErrorContinue(err)
190        };
191
192        tokio::select! {
193            biased;
194            _ = watcher.while_started() => {
195                TaskNextAction::Stop
196            },
197            _ = self.interval.tick() => {
198                let blobs = {
199                    let mut lock = self.blobs.lock().await;
200                    core::mem::take(&mut *lock)
201                };
202                if blobs.is_empty() {
203                    tokio::time::sleep(Duration::from_secs(1)).await;
204                    return TaskNextAction::Continue;
205                };
206
207                let mut account = self.account_metadata.take().expect("Account metadata is not set");
208                let next_order = self.prev_order.map(|prev| prev.wrapping_add(1)).unwrap_or(0);
209                let ss =  self.shared_sequencer_client
210                    .as_ref().expect("Shared sequencer client is not set");
211                let blobs_bytes = postcard::to_allocvec(&blobs).expect("Failed to serialize SSBlob");
212
213                if let Err(err) = ss.send(self.signer.as_ref(), account, next_order, blobs_bytes).await {
214                    return TaskNextAction::ErrorContinue(err);
215                }
216
217                tracing::info!("Posted block to shared sequencer {blobs:?}");
218                account.sequence = account.sequence.saturating_add(1);
219                self.prev_order = Some(next_order);
220                self.account_metadata = Some(account);
221                TaskNextAction::Continue
222            },
223        }
224    }
225
226    async fn shutdown(self) -> anyhow::Result<()> {
227        // Nothing to shut down because we don't have any temporary state that should be dumped,
228        // and we don't spawn any sub-tasks that we need to finish or await.
229        Ok(())
230    }
231}
232
233/// Creates an instance of runnable shared sequencer service.
234pub fn new_service<B, S>(
235    block_provider: B,
236    config: Config,
237    signer: Arc<S>,
238) -> anyhow::Result<ServiceRunner<NonInitializedTask<S>>>
239where
240    B: BlocksProvider,
241    S: Signer,
242{
243    let blocks_events = block_provider.subscribe();
244    Ok(ServiceRunner::new(NonInitializedTask::new(
245        config,
246        blocks_events,
247        signer,
248    )?))
249}