battleware_node/seeder/
actor.rs

1use std::{
2    collections::{BTreeSet, HashMap},
3    time::Duration,
4};
5
6use crate::{
7    indexer::Indexer,
8    seeder::{ingress::Mailbox, Config, Message},
9};
10use battleware_types::Seed;
11use commonware_codec::{DecodeExt, Encode};
12use commonware_consensus::{threshold_simplex::types::View, Viewable};
13use commonware_cryptography::{
14    bls12381::primitives::variant::{MinSig, Variant},
15    ed25519::PublicKey,
16};
17use commonware_p2p::{Receiver, Sender};
18use commonware_resolver::{p2p, Resolver};
19use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
20use commonware_storage::{
21    metadata::{self, Metadata},
22    ordinal::{self, Ordinal},
23    rmap::RMap,
24};
25use commonware_utils::sequence::U64;
26use futures::{
27    channel::{mpsc, oneshot},
28    StreamExt,
29};
30use governor::clock::Clock as GClock;
31use rand::RngCore;
32use tracing::{debug, info, warn};
33
34const BATCH_ENQUEUE: usize = 20;
35const LAST_UPLOADED_KEY: u64 = 0;
36const RETRY_DELAY: Duration = Duration::from_secs(10);
37
38pub struct Actor<R: Storage + Metrics + Clock + Spawner + GClock + RngCore, I: Indexer> {
39    context: R,
40    config: Config<I>,
41    inbound: Mailbox,
42    mailbox: mpsc::Receiver<Message>,
43    waiting: BTreeSet<View>,
44}
45
46impl<R: Storage + Metrics + Clock + Spawner + GClock + RngCore, I: Indexer> Actor<R, I> {
47    pub fn new(context: R, config: Config<I>) -> (Self, Mailbox) {
48        // Create mailbox
49        let (sender, mailbox) = mpsc::channel(config.mailbox_size);
50        let inbound = Mailbox::new(sender);
51
52        (
53            Self {
54                context,
55                config,
56                inbound: inbound.clone(),
57                mailbox,
58                waiting: BTreeSet::new(),
59            },
60            inbound,
61        )
62    }
63
64    pub fn start(
65        mut self,
66        backfill: (
67            impl Sender<PublicKey = PublicKey>,
68            impl Receiver<PublicKey = PublicKey>,
69        ),
70    ) -> Handle<()> {
71        self.context.spawn_ref()(self.run(backfill))
72    }
73
74    async fn run(
75        mut self,
76        backfill: (
77            impl Sender<PublicKey = PublicKey>,
78            impl Receiver<PublicKey = PublicKey>,
79        ),
80    ) {
81        // Create metadata
82        let mut metadata = Metadata::<_, U64, u64>::init(
83            self.context.with_label("metadata"),
84            metadata::Config {
85                partition: format!("{}-metadata", self.config.partition_prefix),
86                codec_config: (),
87            },
88        )
89        .await
90        .expect("failed to initialize metadata");
91
92        // Create storage
93        let mut storage = Ordinal::init(
94            self.context.with_label("seeder"),
95            ordinal::Config {
96                partition: format!("{}-storage", self.config.partition_prefix),
97                items_per_blob: self.config.items_per_blob,
98                write_buffer: self.config.write_buffer,
99                replay_buffer: self.config.replay_buffer,
100            },
101        )
102        .await
103        .expect("failed to initialize seeder storage");
104
105        // Create resolver
106        let (resolver_engine, mut resolver) = p2p::Engine::new(
107            self.context.with_label("resolver"),
108            p2p::Config {
109                coordinator: self.config.supervisor,
110                consumer: self.inbound.clone(),
111                producer: self.inbound.clone(),
112                mailbox_size: self.config.mailbox_size,
113                requester_config: commonware_p2p::utils::requester::Config {
114                    public_key: self.config.public_key,
115                    rate_limit: self.config.backfill_quota,
116                    initial: Duration::from_secs(1),
117                    timeout: Duration::from_secs(2),
118                },
119                fetch_retry_timeout: Duration::from_millis(100),
120                priority_requests: false,
121                priority_responses: false,
122            },
123        );
124        resolver_engine.start(backfill);
125
126        // Track waiters for each seed
127        let mut listeners: HashMap<View, Vec<oneshot::Sender<Seed>>> = HashMap::new();
128
129        // Start by fetching the first missing seeds
130        let missing = storage.missing_items(1, BATCH_ENQUEUE);
131        for next in missing {
132            resolver.fetch(next.into()).await;
133            self.waiting.insert(next);
134        }
135
136        // Track uploads
137        let mut uploads_outstanding = 0;
138        let mut cursor = metadata
139            .get(&LAST_UPLOADED_KEY.into())
140            .cloned()
141            .unwrap_or(1);
142        let mut boundary = cursor;
143        let mut tracked_uploads = RMap::new();
144        info!(cursor, "initial seed cursor");
145
146        // Process messages
147        loop {
148            let Some(message) = self.mailbox.next().await else {
149                warn!("mailbox closed");
150                break;
151            };
152            match message {
153                Message::Uploaded { view } => {
154                    // Decrement uploads outstanding
155                    uploads_outstanding -= 1;
156
157                    // Track uploaded view
158                    tracked_uploads.insert(view);
159
160                    // Update metadata if lowest uploaded has increased
161                    let Some(end_region) = tracked_uploads.next_gap(boundary).0 else {
162                        continue;
163                    };
164                    if end_region > boundary {
165                        boundary = end_region;
166                        metadata.put(LAST_UPLOADED_KEY.into(), end_region);
167                        metadata.sync().await.expect("failed to sync metadata");
168                        info!(boundary, "updated seed upload marker");
169                    }
170                }
171                Message::Put(seed) => {
172                    self.waiting.remove(&seed.view);
173
174                    // Store seed
175                    if !storage.has(seed.view()) {
176                        storage
177                            .put(seed.view(), seed.signature)
178                            .await
179                            .expect("failed to put seed");
180                        storage.sync().await.expect("failed to sync seed");
181                    }
182
183                    // If there were any listeners, send them the seed
184                    if let Some(listeners) = listeners.remove(&seed.view) {
185                        for listener in listeners {
186                            listener.send(seed.clone()).expect("failed to send seed");
187                        }
188                    }
189
190                    // Cancel resolver
191                    if let Some(current_end) = storage.next_gap(1).0 {
192                        let current_end = U64::from(current_end);
193                        resolver.retain(move |x| x > &current_end).await;
194                    }
195
196                    // Enqueue missing seeds
197                    let missing = storage.missing_items(1, BATCH_ENQUEUE);
198                    if missing.is_empty() {
199                        continue;
200                    }
201                    for next in missing {
202                        if !self.waiting.insert(next) {
203                            continue;
204                        }
205                        resolver.fetch(next.into()).await;
206                    }
207                }
208                Message::Get { view, response } => {
209                    let Some(signature) = storage.get(view).await.expect("failed to get seed")
210                    else {
211                        if self.waiting.insert(view) {
212                            resolver.fetch(view.into()).await;
213                        }
214                        listeners.entry(view).or_default().push(response);
215                        continue;
216                    };
217                    response
218                        .send(Seed { view, signature })
219                        .expect("failed to send seed");
220                }
221                Message::Deliver {
222                    view,
223                    signature,
224                    response,
225                } => {
226                    // Verify signature
227                    let Ok(signature) =
228                        <<MinSig as Variant>::Signature>::decode(&mut signature.as_ref())
229                    else {
230                        response.send(false).expect("failed to send none");
231                        continue;
232                    };
233                    let seed = Seed::new(view, signature);
234                    if !seed.verify(&self.config.namespace, &self.config.identity) {
235                        response.send(false).expect("failed to send false");
236                        continue;
237                    }
238
239                    self.waiting.remove(&view);
240
241                    // Notify resolver
242                    response.send(true).expect("failed to send true");
243
244                    // Store seed
245                    if !storage.has(view) {
246                        storage
247                            .put(view, signature)
248                            .await
249                            .expect("failed to put seed");
250                        storage.sync().await.expect("failed to sync seed");
251                    }
252
253                    // Notify listeners
254                    if let Some(listeners) = listeners.remove(&view) {
255                        for listener in listeners {
256                            listener.send(seed.clone()).expect("failed to send seed");
257                        }
258                    }
259
260                    // Cancel resolver
261                    if let Some(current_end) = storage.next_gap(1).0 {
262                        let current_end = U64::from(current_end);
263                        resolver.retain(move |x| x > &current_end).await;
264                    }
265
266                    // Enqueue missing seeds
267                    let missing = storage.missing_items(1, BATCH_ENQUEUE);
268                    for next in missing {
269                        if !self.waiting.insert(next) {
270                            continue;
271                        }
272                        resolver.fetch(next.into()).await;
273                    }
274                }
275                Message::Produce { view, response } => {
276                    // Serve seed from storage
277                    let Some(encoded) = storage.get(view).await.expect("failed to get seed") else {
278                        continue;
279                    };
280                    response
281                        .send(encoded.encode().into())
282                        .expect("failed to send seed");
283                }
284            }
285
286            // Attempt to upload any seeds
287            while uploads_outstanding < self.config.max_uploads_outstanding {
288                // Get next seed
289                let Some(seed) = storage.get(cursor).await.expect("failed to get seed") else {
290                    break;
291                };
292
293                // Increment uploads outstanding
294                uploads_outstanding += 1;
295
296                // Upload seed to indexer
297                self.context.with_label("seed_submit").spawn({
298                    let seed = Seed::new(cursor, seed);
299                    let indexer = self.config.indexer.clone();
300                    let mut channel = self.inbound.clone();
301                    move |context| async move {
302                        let view = seed.view();
303                        let mut attempts = 1;
304                        loop {
305                            let Err(e) = indexer.submit_seed(seed.clone()).await else {
306                                break;
307                            };
308                            warn!(?e, attempts, "failed to upload seed");
309                            context.sleep(RETRY_DELAY).await;
310                            attempts += 1;
311                        }
312                        debug!(view, attempts, "seed uploaded to indexer");
313                        channel.uploaded(view).await;
314                    }
315                });
316
317                // Increment cursor
318                cursor += 1;
319            }
320        }
321    }
322}