1use std::{
2 collections::{BTreeSet, HashMap},
3 time::Duration,
4};
5
6use crate::{
7 indexer::Indexer,
8 seeder::{ingress::Mailbox, Config, Message},
9};
10use battleware_types::Seed;
11use commonware_codec::{DecodeExt, Encode};
12use commonware_consensus::{threshold_simplex::types::View, Viewable};
13use commonware_cryptography::{
14 bls12381::primitives::variant::{MinSig, Variant},
15 ed25519::PublicKey,
16};
17use commonware_p2p::{Receiver, Sender};
18use commonware_resolver::{p2p, Resolver};
19use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
20use commonware_storage::{
21 metadata::{self, Metadata},
22 ordinal::{self, Ordinal},
23 rmap::RMap,
24};
25use commonware_utils::sequence::U64;
26use futures::{
27 channel::{mpsc, oneshot},
28 StreamExt,
29};
30use governor::clock::Clock as GClock;
31use rand::RngCore;
32use tracing::{debug, info, warn};
33
34const BATCH_ENQUEUE: usize = 20;
35const LAST_UPLOADED_KEY: u64 = 0;
36const RETRY_DELAY: Duration = Duration::from_secs(10);
37
38pub struct Actor<R: Storage + Metrics + Clock + Spawner + GClock + RngCore, I: Indexer> {
39 context: R,
40 config: Config<I>,
41 inbound: Mailbox,
42 mailbox: mpsc::Receiver<Message>,
43 waiting: BTreeSet<View>,
44}
45
46impl<R: Storage + Metrics + Clock + Spawner + GClock + RngCore, I: Indexer> Actor<R, I> {
47 pub fn new(context: R, config: Config<I>) -> (Self, Mailbox) {
48 let (sender, mailbox) = mpsc::channel(config.mailbox_size);
50 let inbound = Mailbox::new(sender);
51
52 (
53 Self {
54 context,
55 config,
56 inbound: inbound.clone(),
57 mailbox,
58 waiting: BTreeSet::new(),
59 },
60 inbound,
61 )
62 }
63
64 pub fn start(
65 mut self,
66 backfill: (
67 impl Sender<PublicKey = PublicKey>,
68 impl Receiver<PublicKey = PublicKey>,
69 ),
70 ) -> Handle<()> {
71 self.context.spawn_ref()(self.run(backfill))
72 }
73
74 async fn run(
75 mut self,
76 backfill: (
77 impl Sender<PublicKey = PublicKey>,
78 impl Receiver<PublicKey = PublicKey>,
79 ),
80 ) {
81 let mut metadata = Metadata::<_, U64, u64>::init(
83 self.context.with_label("metadata"),
84 metadata::Config {
85 partition: format!("{}-metadata", self.config.partition_prefix),
86 codec_config: (),
87 },
88 )
89 .await
90 .expect("failed to initialize metadata");
91
92 let mut storage = Ordinal::init(
94 self.context.with_label("seeder"),
95 ordinal::Config {
96 partition: format!("{}-storage", self.config.partition_prefix),
97 items_per_blob: self.config.items_per_blob,
98 write_buffer: self.config.write_buffer,
99 replay_buffer: self.config.replay_buffer,
100 },
101 )
102 .await
103 .expect("failed to initialize seeder storage");
104
105 let (resolver_engine, mut resolver) = p2p::Engine::new(
107 self.context.with_label("resolver"),
108 p2p::Config {
109 coordinator: self.config.supervisor,
110 consumer: self.inbound.clone(),
111 producer: self.inbound.clone(),
112 mailbox_size: self.config.mailbox_size,
113 requester_config: commonware_p2p::utils::requester::Config {
114 public_key: self.config.public_key,
115 rate_limit: self.config.backfill_quota,
116 initial: Duration::from_secs(1),
117 timeout: Duration::from_secs(2),
118 },
119 fetch_retry_timeout: Duration::from_millis(100),
120 priority_requests: false,
121 priority_responses: false,
122 },
123 );
124 resolver_engine.start(backfill);
125
126 let mut listeners: HashMap<View, Vec<oneshot::Sender<Seed>>> = HashMap::new();
128
129 let missing = storage.missing_items(1, BATCH_ENQUEUE);
131 for next in missing {
132 resolver.fetch(next.into()).await;
133 self.waiting.insert(next);
134 }
135
136 let mut uploads_outstanding = 0;
138 let mut cursor = metadata
139 .get(&LAST_UPLOADED_KEY.into())
140 .cloned()
141 .unwrap_or(1);
142 let mut boundary = cursor;
143 let mut tracked_uploads = RMap::new();
144 info!(cursor, "initial seed cursor");
145
146 loop {
148 let Some(message) = self.mailbox.next().await else {
149 warn!("mailbox closed");
150 break;
151 };
152 match message {
153 Message::Uploaded { view } => {
154 uploads_outstanding -= 1;
156
157 tracked_uploads.insert(view);
159
160 let Some(end_region) = tracked_uploads.next_gap(boundary).0 else {
162 continue;
163 };
164 if end_region > boundary {
165 boundary = end_region;
166 metadata.put(LAST_UPLOADED_KEY.into(), end_region);
167 metadata.sync().await.expect("failed to sync metadata");
168 info!(boundary, "updated seed upload marker");
169 }
170 }
171 Message::Put(seed) => {
172 self.waiting.remove(&seed.view);
173
174 if !storage.has(seed.view()) {
176 storage
177 .put(seed.view(), seed.signature)
178 .await
179 .expect("failed to put seed");
180 storage.sync().await.expect("failed to sync seed");
181 }
182
183 if let Some(listeners) = listeners.remove(&seed.view) {
185 for listener in listeners {
186 listener.send(seed.clone()).expect("failed to send seed");
187 }
188 }
189
190 if let Some(current_end) = storage.next_gap(1).0 {
192 let current_end = U64::from(current_end);
193 resolver.retain(move |x| x > ¤t_end).await;
194 }
195
196 let missing = storage.missing_items(1, BATCH_ENQUEUE);
198 if missing.is_empty() {
199 continue;
200 }
201 for next in missing {
202 if !self.waiting.insert(next) {
203 continue;
204 }
205 resolver.fetch(next.into()).await;
206 }
207 }
208 Message::Get { view, response } => {
209 let Some(signature) = storage.get(view).await.expect("failed to get seed")
210 else {
211 if self.waiting.insert(view) {
212 resolver.fetch(view.into()).await;
213 }
214 listeners.entry(view).or_default().push(response);
215 continue;
216 };
217 response
218 .send(Seed { view, signature })
219 .expect("failed to send seed");
220 }
221 Message::Deliver {
222 view,
223 signature,
224 response,
225 } => {
226 let Ok(signature) =
228 <<MinSig as Variant>::Signature>::decode(&mut signature.as_ref())
229 else {
230 response.send(false).expect("failed to send none");
231 continue;
232 };
233 let seed = Seed::new(view, signature);
234 if !seed.verify(&self.config.namespace, &self.config.identity) {
235 response.send(false).expect("failed to send false");
236 continue;
237 }
238
239 self.waiting.remove(&view);
240
241 response.send(true).expect("failed to send true");
243
244 if !storage.has(view) {
246 storage
247 .put(view, signature)
248 .await
249 .expect("failed to put seed");
250 storage.sync().await.expect("failed to sync seed");
251 }
252
253 if let Some(listeners) = listeners.remove(&view) {
255 for listener in listeners {
256 listener.send(seed.clone()).expect("failed to send seed");
257 }
258 }
259
260 if let Some(current_end) = storage.next_gap(1).0 {
262 let current_end = U64::from(current_end);
263 resolver.retain(move |x| x > ¤t_end).await;
264 }
265
266 let missing = storage.missing_items(1, BATCH_ENQUEUE);
268 for next in missing {
269 if !self.waiting.insert(next) {
270 continue;
271 }
272 resolver.fetch(next.into()).await;
273 }
274 }
275 Message::Produce { view, response } => {
276 let Some(encoded) = storage.get(view).await.expect("failed to get seed") else {
278 continue;
279 };
280 response
281 .send(encoded.encode().into())
282 .expect("failed to send seed");
283 }
284 }
285
286 while uploads_outstanding < self.config.max_uploads_outstanding {
288 let Some(seed) = storage.get(cursor).await.expect("failed to get seed") else {
290 break;
291 };
292
293 uploads_outstanding += 1;
295
296 self.context.with_label("seed_submit").spawn({
298 let seed = Seed::new(cursor, seed);
299 let indexer = self.config.indexer.clone();
300 let mut channel = self.inbound.clone();
301 move |context| async move {
302 let view = seed.view();
303 let mut attempts = 1;
304 loop {
305 let Err(e) = indexer.submit_seed(seed.clone()).await else {
306 break;
307 };
308 warn!(?e, attempts, "failed to upload seed");
309 context.sleep(RETRY_DELAY).await;
310 attempts += 1;
311 }
312 debug!(view, attempts, "seed uploaded to indexer");
313 channel.uploaded(view).await;
314 }
315 });
316
317 cursor += 1;
319 }
320 }
321 }
322}