1use super::{
2 config::Config,
3 fetcher::{Config as FetcherConfig, Fetcher},
4 ingress::{FetchRequest, Mailbox, Message},
5 metrics, wire, Producer,
6};
7use crate::Consumer;
8use bytes::Bytes;
9use commonware_cryptography::PublicKey;
10use commonware_macros::select;
11use commonware_p2p::{
12 utils::codec::{wrap, WrappedSender},
13 Blocker, Manager, Receiver, Recipients, Sender,
14};
15use commonware_runtime::{
16 spawn_cell,
17 telemetry::metrics::{
18 histogram,
19 status::{CounterExt, GaugeExt, Status},
20 },
21 Clock, ContextCell, Handle, Metrics, Spawner,
22};
23use commonware_utils::{futures::Pool as FuturesPool, Span};
24use futures::{
25 channel::{mpsc, oneshot},
26 future::{self, Either},
27 StreamExt,
28};
29use rand::Rng;
30use std::{collections::HashMap, marker::PhantomData};
31use tracing::{debug, error, trace, warn};
32
33struct Serve<E: Clock, P: PublicKey> {
35 timer: histogram::Timer<E>,
36 peer: P,
37 id: u64,
38 result: Result<Bytes, oneshot::Canceled>,
39}
40
41pub struct Engine<
43 E: Clock + Spawner + Rng + Metrics,
44 P: PublicKey,
45 D: Manager<PublicKey = P>,
46 B: Blocker<PublicKey = P>,
47 Key: Span,
48 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
49 Pro: Producer<Key = Key>,
50 NetS: Sender<PublicKey = P>,
51 NetR: Receiver<PublicKey = P>,
52> {
53 context: ContextCell<E>,
55
56 consumer: Con,
58
59 producer: Pro,
61
62 manager: D,
64
65 blocker: B,
67
68 last_peer_set_id: Option<u64>,
70
71 mailbox: mpsc::Receiver<Message<Key, P>>,
73
74 fetcher: Fetcher<E, P, Key, NetS>,
76
77 fetch_timers: HashMap<Key, histogram::Timer<E>>,
79
80 serves: FuturesPool<Serve<E, P>>,
85
86 priority_responses: bool,
88
89 metrics: metrics::Metrics<E>,
91
92 _s: PhantomData<NetS>,
94 _r: PhantomData<NetR>,
95}
96
97impl<
98 E: Clock + Spawner + Rng + Metrics,
99 P: PublicKey,
100 D: Manager<PublicKey = P>,
101 B: Blocker<PublicKey = P>,
102 Key: Span,
103 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
104 Pro: Producer<Key = Key>,
105 NetS: Sender<PublicKey = P>,
106 NetR: Receiver<PublicKey = P>,
107 > Engine<E, P, D, B, Key, Con, Pro, NetS, NetR>
108{
109 pub fn new(context: E, cfg: Config<P, D, B, Key, Con, Pro>) -> (Self, Mailbox<Key, P>) {
113 let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
114
115 let metrics = metrics::Metrics::init(context.clone());
117 let fetcher = Fetcher::new(
118 context.with_label("fetcher"),
119 FetcherConfig {
120 me: cfg.me,
121 initial: cfg.initial,
122 timeout: cfg.timeout,
123 retry_timeout: cfg.fetch_retry_timeout,
124 priority_requests: cfg.priority_requests,
125 },
126 );
127 (
128 Self {
129 context: ContextCell::new(context),
130 consumer: cfg.consumer,
131 producer: cfg.producer,
132 manager: cfg.manager,
133 blocker: cfg.blocker,
134 last_peer_set_id: None,
135 mailbox: receiver,
136 fetcher,
137 serves: FuturesPool::default(),
138 priority_responses: cfg.priority_responses,
139 metrics,
140 fetch_timers: HashMap::new(),
141 _s: PhantomData,
142 _r: PhantomData,
143 },
144 Mailbox::new(sender),
145 )
146 }
147
148 pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
154 spawn_cell!(self.context, self.run(network).await)
155 }
156
157 async fn run(mut self, network: (NetS, NetR)) {
159 let mut shutdown = self.context.stopped();
160 let peer_set_subscription = &mut self.manager.subscribe().await;
161
162 let (mut sender, mut receiver) = wrap((), network.0, network.1);
164
165 loop {
166 let _ = self
168 .metrics
169 .fetch_pending
170 .try_set(self.fetcher.len_pending());
171 let _ = self.metrics.fetch_active.try_set(self.fetcher.len_active());
172 let _ = self
173 .metrics
174 .peers_blocked
175 .try_set(self.fetcher.len_blocked());
176 let _ = self.metrics.serve_processing.try_set(self.serves.len());
177
178 let deadline_pending = match self.fetcher.get_pending_deadline() {
180 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
181 None => Either::Right(future::pending()),
182 };
183
184 let deadline_active = match self.fetcher.get_active_deadline() {
186 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
187 None => Either::Right(future::pending()),
188 };
189
190 select! {
192 _ = &mut shutdown => {
193 debug!("shutdown");
194 self.serves.cancel_all();
195 return;
196 },
197
198 peer_set_update = peer_set_subscription.next() => {
200 let Some((id, _, all)) = peer_set_update else {
201 debug!("peer set subscription closed");
202 return;
203 };
204
205 if self.last_peer_set_id < Some(id) {
208 self.last_peer_set_id = Some(id);
209 self.fetcher.reconcile(all.as_ref());
210 }
211 },
212
213 _ = deadline_active => {
215 if let Some(key) = self.fetcher.pop_active() {
216 debug!(?key, "requester timeout");
217 self.metrics.fetch.inc(Status::Failure);
218 self.fetcher.add_retry(key);
219 }
220 },
221
222 _ = deadline_pending => {
224 self.fetcher.fetch(&mut sender).await;
225 },
226
227 msg = self.mailbox.next() => {
229 let Some(msg) = msg else {
230 error!("mailbox closed");
231 return;
232 };
233 match msg {
234 Message::Fetch(requests) => {
235 for FetchRequest { key, targets } in requests {
236 trace!(?key, "mailbox: fetch");
237
238 let is_new = !self.fetch_timers.contains_key(&key);
240
241 match targets {
243 Some(targets) => {
244 if is_new || self.fetcher.has_targets(&key) {
248 self.fetcher.add_targets(key.clone(), targets);
249 }
250 }
251 None => self.fetcher.clear_targets(&key),
252 }
253
254 if is_new {
256 self.fetch_timers.insert(key.clone(), self.metrics.fetch_duration.timer());
257 self.fetcher.add_ready(key);
258 } else {
259 trace!(?key, "updated targets for existing fetch");
260 }
261 }
262 }
263 Message::Cancel { key } => {
264 trace!(?key, "mailbox: cancel");
265 let mut guard = self.metrics.cancel.guard(Status::Dropped);
266 if self.fetcher.cancel(&key) {
267 guard.set(Status::Success);
268 self.fetch_timers.remove(&key).unwrap().cancel(); self.consumer.failed(key.clone(), ()).await;
270 }
271 }
272 Message::Retain { predicate } => {
273 trace!("mailbox: retain");
274
275 self.fetcher.retain(&predicate);
277
278 let before = self.fetch_timers.len();
280 let removed = self.fetch_timers.extract_if(|k, _| !predicate(k)).collect::<Vec<_>>();
281 for (key, timer) in removed {
282 timer.cancel();
283 self.consumer.failed(key, ()).await;
284 }
285
286 let removed = (before - self.fetch_timers.len()) as u64;
288 if removed == 0 {
289 self.metrics.cancel.inc(Status::Dropped);
290 } else {
291 self.metrics.cancel.inc_by(Status::Success, removed);
292 }
293 }
294 Message::Clear => {
295 trace!("mailbox: clear");
296
297 self.fetcher.clear();
299
300 let removed = self.fetch_timers.len() as u64;
302 for (key, timer) in self.fetch_timers.drain() {
303 timer.cancel();
304 self.consumer.failed(key, ()).await;
305 }
306
307 if removed == 0 {
309 self.metrics.cancel.inc(Status::Dropped);
310 } else {
311 self.metrics.cancel.inc_by(Status::Success, removed);
312 }
313 }
314 }
315 assert_eq!(self.fetcher.len(), self.fetch_timers.len());
316 },
317
318 serve = self.serves.next_completed() => {
320 let Serve { timer, peer, id, result } = serve;
321
322 match result {
324 Ok(_) => {
325 self.metrics.serve.inc(Status::Success);
326 }
327 Err(err) => {
328 debug!(?err, ?peer, ?id, "serve failed");
329 timer.cancel();
330 self.metrics.serve.inc(Status::Failure);
331 }
332 }
333
334 self.handle_serve(&mut sender, peer, id, result, self.priority_responses).await;
336 },
337
338 msg = receiver.recv() => {
340 let (peer, msg) = match msg {
342 Ok(msg) => msg,
343 Err(err) => {
344 error!(?err, "receiver closed");
345 return;
346 }
347 };
348
349 let msg = match msg {
351 Ok(msg) => msg,
352 Err(err) => {
353 trace!(?err, ?peer, "decode failed");
354 continue;
355 }
356 };
357 match msg.payload {
358 wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key).await,
359 wire::Payload::Response(response) => self.handle_network_response(peer, msg.id, response).await,
360 wire::Payload::Error => self.handle_network_error_response(peer, msg.id).await,
361 };
362 },
363 }
364 }
365 }
366
367 async fn handle_serve(
369 &mut self,
370 sender: &mut WrappedSender<NetS, wire::Message<Key>>,
371 peer: P,
372 id: u64,
373 response: Result<Bytes, oneshot::Canceled>,
374 priority: bool,
375 ) {
376 let payload: wire::Payload<Key> = response.map_or_else(
378 |_| wire::Payload::Error,
379 |data| wire::Payload::Response(data),
380 );
381 let msg = wire::Message { id, payload };
382
383 let result = sender
385 .send(Recipients::One(peer.clone()), msg, priority)
386 .await;
387
388 match result {
390 Err(err) => error!(?err, ?peer, ?id, "serve send failed"),
391 Ok(to) if to.is_empty() => warn!(?peer, ?id, "serve send failed"),
392 Ok(_) => trace!(?peer, ?id, "serve sent"),
393 };
394 }
395
396 async fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
398 trace!(?peer, ?id, "peer request");
400 let mut producer = self.producer.clone();
401 let timer = self.metrics.serve_duration.timer();
402 self.serves.push(async move {
403 let receiver = producer.produce(key).await;
404 let result = receiver.await;
405 Serve {
406 timer,
407 peer,
408 id,
409 result,
410 }
411 });
412 }
413
414 async fn handle_network_response(&mut self, peer: P, id: u64, response: Bytes) {
416 trace!(?peer, ?id, "peer response: data");
417
418 let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
420 return;
422 };
423
424 if self.consumer.deliver(key.clone(), response).await {
426 self.metrics.fetch.inc(Status::Success);
428 self.fetch_timers.remove(&key).unwrap(); self.fetcher.clear_targets(&key);
432 return;
433 }
434
435 self.blocker.block(peer.clone()).await;
438 self.fetcher.block(peer);
439 self.metrics.fetch.inc(Status::Failure);
440 self.fetcher.add_retry(key);
441 }
442
443 async fn handle_network_error_response(&mut self, peer: P, id: u64) {
445 trace!(?peer, ?id, "peer response: error");
446
447 let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
449 return;
451 };
452
453 self.metrics.fetch.inc(Status::Failure);
455 self.fetcher.add_retry(key);
456 }
457}