1use super::{
2 config::Config,
3 fetcher::{Config as FetcherConfig, Fetcher},
4 ingress::{FetchRequest, Mailbox, Message},
5 metrics, wire, Producer,
6};
7use crate::Consumer;
8use bytes::Bytes;
9use commonware_cryptography::PublicKey;
10use commonware_macros::select;
11use commonware_p2p::{
12 utils::codec::{wrap, WrappedSender},
13 Blocker, Manager, Receiver, Recipients, Sender,
14};
15use commonware_runtime::{
16 spawn_cell,
17 telemetry::metrics::{
18 histogram,
19 status::{CounterExt, GaugeExt, Status},
20 },
21 Clock, ContextCell, Handle, Metrics, Spawner,
22};
23use commonware_utils::{futures::Pool as FuturesPool, Span};
24use futures::{
25 channel::{mpsc, oneshot},
26 future::{self, Either},
27 StreamExt,
28};
29use rand::Rng;
30use std::{collections::HashMap, marker::PhantomData};
31use tracing::{debug, error, trace, warn};
32
33struct Serve<E: Clock, P: PublicKey> {
35 timer: histogram::Timer<E>,
36 peer: P,
37 id: u64,
38 result: Result<Bytes, oneshot::Canceled>,
39}
40
41pub struct Engine<
43 E: Clock + Spawner + Rng + Metrics,
44 P: PublicKey,
45 D: Manager<PublicKey = P>,
46 B: Blocker<PublicKey = P>,
47 Key: Span,
48 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
49 Pro: Producer<Key = Key>,
50 NetS: Sender<PublicKey = P>,
51 NetR: Receiver<PublicKey = P>,
52> {
53 context: ContextCell<E>,
55
56 consumer: Con,
58
59 producer: Pro,
61
62 manager: D,
64
65 blocker: B,
67
68 last_peer_set_id: Option<u64>,
70
71 mailbox: mpsc::Receiver<Message<Key, P>>,
73
74 fetcher: Fetcher<E, P, Key, NetS>,
76
77 fetch_timers: HashMap<Key, histogram::Timer<E>>,
79
80 serves: FuturesPool<Serve<E, P>>,
85
86 priority_responses: bool,
88
89 metrics: metrics::Metrics<E>,
91
92 _r: PhantomData<NetR>,
94}
95
96impl<
97 E: Clock + Spawner + Rng + Metrics,
98 P: PublicKey,
99 D: Manager<PublicKey = P>,
100 B: Blocker<PublicKey = P>,
101 Key: Span,
102 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
103 Pro: Producer<Key = Key>,
104 NetS: Sender<PublicKey = P>,
105 NetR: Receiver<PublicKey = P>,
106 > Engine<E, P, D, B, Key, Con, Pro, NetS, NetR>
107{
108 pub fn new(context: E, cfg: Config<P, D, B, Key, Con, Pro>) -> (Self, Mailbox<Key, P>) {
112 let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
113
114 let metrics = metrics::Metrics::init(context.clone());
116 let fetcher = Fetcher::new(
117 context.with_label("fetcher"),
118 FetcherConfig {
119 me: cfg.me,
120 initial: cfg.initial,
121 timeout: cfg.timeout,
122 retry_timeout: cfg.fetch_retry_timeout,
123 priority_requests: cfg.priority_requests,
124 },
125 );
126 (
127 Self {
128 context: ContextCell::new(context),
129 consumer: cfg.consumer,
130 producer: cfg.producer,
131 manager: cfg.manager,
132 blocker: cfg.blocker,
133 last_peer_set_id: None,
134 mailbox: receiver,
135 fetcher,
136 serves: FuturesPool::default(),
137 priority_responses: cfg.priority_responses,
138 metrics,
139 fetch_timers: HashMap::new(),
140 _r: PhantomData,
141 },
142 Mailbox::new(sender),
143 )
144 }
145
146 pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
152 spawn_cell!(self.context, self.run(network).await)
153 }
154
155 async fn run(mut self, network: (NetS, NetR)) {
157 let mut shutdown = self.context.stopped();
158 let peer_set_subscription = &mut self.manager.subscribe().await;
159
160 let (mut sender, mut receiver) = wrap((), network.0, network.1);
162
163 loop {
164 let _ = self
166 .metrics
167 .fetch_pending
168 .try_set(self.fetcher.len_pending());
169 let _ = self.metrics.fetch_active.try_set(self.fetcher.len_active());
170 let _ = self
171 .metrics
172 .peers_blocked
173 .try_set(self.fetcher.len_blocked());
174 let _ = self.metrics.serve_processing.try_set(self.serves.len());
175
176 let deadline_pending = match self.fetcher.get_pending_deadline() {
178 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
179 None => Either::Right(future::pending()),
180 };
181
182 let deadline_active = match self.fetcher.get_active_deadline() {
184 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
185 None => Either::Right(future::pending()),
186 };
187
188 select! {
190 _ = &mut shutdown => {
191 debug!("shutdown");
192 self.serves.cancel_all();
193 return;
194 },
195
196 peer_set_update = peer_set_subscription.next() => {
198 let Some((id, _, all)) = peer_set_update else {
199 debug!("peer set subscription closed");
200 return;
201 };
202
203 if self.last_peer_set_id < Some(id) {
206 self.last_peer_set_id = Some(id);
207 self.fetcher.reconcile(all.as_ref());
208 }
209 },
210
211 _ = deadline_active => {
213 if let Some(key) = self.fetcher.pop_active() {
214 debug!(?key, "requester timeout");
215 self.metrics.fetch.inc(Status::Failure);
216 self.fetcher.add_retry(key);
217 }
218 },
219
220 _ = deadline_pending => {
222 self.fetcher.fetch(&mut sender).await;
223 },
224
225 msg = self.mailbox.next() => {
227 let Some(msg) = msg else {
228 error!("mailbox closed");
229 return;
230 };
231 match msg {
232 Message::Fetch(requests) => {
233 for FetchRequest { key, targets } in requests {
234 trace!(?key, "mailbox: fetch");
235
236 let is_new = !self.fetch_timers.contains_key(&key);
238
239 match targets {
241 Some(targets) => {
242 if is_new || self.fetcher.has_targets(&key) {
246 self.fetcher.add_targets(key.clone(), targets);
247 }
248 }
249 None => self.fetcher.clear_targets(&key),
250 }
251
252 if is_new {
254 self.fetch_timers.insert(key.clone(), self.metrics.fetch_duration.timer());
255 self.fetcher.add_ready(key);
256 } else {
257 trace!(?key, "updated targets for existing fetch");
258 }
259 }
260 }
261 Message::Cancel { key } => {
262 trace!(?key, "mailbox: cancel");
263 let mut guard = self.metrics.cancel.guard(Status::Dropped);
264 if self.fetcher.cancel(&key) {
265 guard.set(Status::Success);
266 self.fetch_timers.remove(&key).unwrap().cancel(); self.consumer.failed(key.clone(), ()).await;
268 }
269 }
270 Message::Retain { predicate } => {
271 trace!("mailbox: retain");
272
273 self.fetcher.retain(&predicate);
275
276 let before = self.fetch_timers.len();
278 let removed = self.fetch_timers.extract_if(|k, _| !predicate(k)).collect::<Vec<_>>();
279 for (key, timer) in removed {
280 timer.cancel();
281 self.consumer.failed(key, ()).await;
282 }
283
284 let removed = (before - self.fetch_timers.len()) as u64;
286 if removed == 0 {
287 self.metrics.cancel.inc(Status::Dropped);
288 } else {
289 self.metrics.cancel.inc_by(Status::Success, removed);
290 }
291 }
292 Message::Clear => {
293 trace!("mailbox: clear");
294
295 self.fetcher.clear();
297
298 let removed = self.fetch_timers.len() as u64;
300 for (key, timer) in self.fetch_timers.drain() {
301 timer.cancel();
302 self.consumer.failed(key, ()).await;
303 }
304
305 if removed == 0 {
307 self.metrics.cancel.inc(Status::Dropped);
308 } else {
309 self.metrics.cancel.inc_by(Status::Success, removed);
310 }
311 }
312 }
313 assert_eq!(self.fetcher.len(), self.fetch_timers.len());
314 },
315
316 serve = self.serves.next_completed() => {
318 let Serve { timer, peer, id, result } = serve;
319
320 match result {
322 Ok(_) => {
323 self.metrics.serve.inc(Status::Success);
324 }
325 Err(err) => {
326 debug!(?err, ?peer, ?id, "serve failed");
327 timer.cancel();
328 self.metrics.serve.inc(Status::Failure);
329 }
330 }
331
332 self.handle_serve(&mut sender, peer, id, result, self.priority_responses).await;
334 },
335
336 msg = receiver.recv() => {
338 let (peer, msg) = match msg {
340 Ok(msg) => msg,
341 Err(err) => {
342 error!(?err, "receiver closed");
343 return;
344 }
345 };
346
347 let msg = match msg {
349 Ok(msg) => msg,
350 Err(err) => {
351 trace!(?err, ?peer, "decode failed");
352 continue;
353 }
354 };
355 match msg.payload {
356 wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key).await,
357 wire::Payload::Response(response) => self.handle_network_response(peer, msg.id, response).await,
358 wire::Payload::Error => self.handle_network_error_response(peer, msg.id).await,
359 };
360 },
361 }
362 }
363 }
364
365 async fn handle_serve(
367 &mut self,
368 sender: &mut WrappedSender<NetS, wire::Message<Key>>,
369 peer: P,
370 id: u64,
371 response: Result<Bytes, oneshot::Canceled>,
372 priority: bool,
373 ) {
374 let payload: wire::Payload<Key> = response.map_or_else(
376 |_| wire::Payload::Error,
377 |data| wire::Payload::Response(data),
378 );
379 let msg = wire::Message { id, payload };
380
381 let result = sender
383 .send(Recipients::One(peer.clone()), msg, priority)
384 .await;
385
386 match result {
388 Err(err) => error!(?err, ?peer, ?id, "serve send failed"),
389 Ok(to) if to.is_empty() => warn!(?peer, ?id, "serve send failed"),
390 Ok(_) => trace!(?peer, ?id, "serve sent"),
391 };
392 }
393
394 async fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
396 trace!(?peer, ?id, "peer request");
398 let mut producer = self.producer.clone();
399 let timer = self.metrics.serve_duration.timer();
400 self.serves.push(async move {
401 let receiver = producer.produce(key).await;
402 let result = receiver.await;
403 Serve {
404 timer,
405 peer,
406 id,
407 result,
408 }
409 });
410 }
411
412 async fn handle_network_response(&mut self, peer: P, id: u64, response: Bytes) {
414 trace!(?peer, ?id, "peer response: data");
415
416 let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
418 return;
420 };
421
422 if self.consumer.deliver(key.clone(), response).await {
424 self.metrics.fetch.inc(Status::Success);
426 self.fetch_timers.remove(&key).unwrap(); self.fetcher.clear_targets(&key);
430 return;
431 }
432
433 self.blocker.block(peer.clone()).await;
436 self.fetcher.block(peer);
437 self.metrics.fetch.inc(Status::Failure);
438 self.fetcher.add_retry(key);
439 }
440
441 async fn handle_network_error_response(&mut self, peer: P, id: u64) {
443 trace!(?peer, ?id, "peer response: error");
444
445 let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
447 return;
449 };
450
451 self.metrics.fetch.inc(Status::Failure);
453 self.fetcher.add_retry(key);
454 }
455}