1use super::{
2 config::Config,
3 fetcher::{Config as FetcherConfig, Fetcher},
4 ingress::{FetchRequest, Mailbox, Message},
5 metrics, wire, Producer,
6};
7use crate::Consumer;
8use bytes::Bytes;
9use commonware_cryptography::PublicKey;
10use commonware_macros::select_loop;
11use commonware_p2p::{
12 utils::codec::{wrap, WrappedSender},
13 Blocker, Provider, Receiver, Recipients, Sender,
14};
15use commonware_runtime::{
16 spawn_cell,
17 telemetry::metrics::{
18 histogram,
19 status::{CounterExt, GaugeExt, Status},
20 },
21 BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
22};
23use commonware_utils::{
24 channel::{mpsc, oneshot},
25 futures::Pool as FuturesPool,
26 Span,
27};
28use futures::future::{self, Either};
29use rand::Rng;
30use std::{collections::HashMap, marker::PhantomData};
31use tracing::{debug, error, trace, warn};
32
33struct Serve<E: Clock, P: PublicKey> {
35 timer: histogram::Timer<E>,
36 peer: P,
37 id: u64,
38 result: Result<Bytes, oneshot::error::RecvError>,
39}
40
41pub struct Engine<
43 E: BufferPooler + Clock + Spawner + Rng + Metrics,
44 P: PublicKey,
45 D: Provider<PublicKey = P>,
46 B: Blocker<PublicKey = P>,
47 Key: Span,
48 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
49 Pro: Producer<Key = Key>,
50 NetS: Sender<PublicKey = P>,
51 NetR: Receiver<PublicKey = P>,
52> {
53 context: ContextCell<E>,
55
56 consumer: Con,
58
59 producer: Pro,
61
62 peer_provider: D,
64
65 blocker: B,
67
68 last_peer_set_id: Option<u64>,
70
71 mailbox: mpsc::Receiver<Message<Key, P>>,
73
74 fetcher: Fetcher<E, P, Key, NetS>,
76
77 fetch_timers: HashMap<Key, histogram::Timer<E>>,
79
80 serves: FuturesPool<Serve<E, P>>,
85
86 priority_responses: bool,
88
89 metrics: metrics::Metrics<E>,
91
92 _r: PhantomData<NetR>,
94}
95
96impl<
97 E: BufferPooler + Clock + Spawner + Rng + Metrics,
98 P: PublicKey,
99 D: Provider<PublicKey = P>,
100 B: Blocker<PublicKey = P>,
101 Key: Span,
102 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
103 Pro: Producer<Key = Key>,
104 NetS: Sender<PublicKey = P>,
105 NetR: Receiver<PublicKey = P>,
106 > Engine<E, P, D, B, Key, Con, Pro, NetS, NetR>
107{
108 pub fn new(context: E, cfg: Config<P, D, B, Key, Con, Pro>) -> (Self, Mailbox<Key, P>) {
112 let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
113
114 let metrics = metrics::Metrics::init(context.clone());
116 let fetcher = Fetcher::new(
117 context.with_label("fetcher"),
118 FetcherConfig {
119 me: cfg.me,
120 initial: cfg.initial,
121 timeout: cfg.timeout,
122 retry_timeout: cfg.fetch_retry_timeout,
123 priority_requests: cfg.priority_requests,
124 },
125 );
126 (
127 Self {
128 context: ContextCell::new(context),
129 consumer: cfg.consumer,
130 producer: cfg.producer,
131 peer_provider: cfg.peer_provider,
132 blocker: cfg.blocker,
133 last_peer_set_id: None,
134 mailbox: receiver,
135 fetcher,
136 serves: FuturesPool::default(),
137 priority_responses: cfg.priority_responses,
138 metrics,
139 fetch_timers: HashMap::new(),
140 _r: PhantomData,
141 },
142 Mailbox::new(sender),
143 )
144 }
145
146 pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
152 spawn_cell!(self.context, self.run(network).await)
153 }
154
155 async fn run(mut self, network: (NetS, NetR)) {
157 let (mut sender, mut receiver) = wrap(
159 (),
160 self.context.network_buffer_pool().clone(),
161 network.0,
162 network.1,
163 );
164 let peer_set_subscription = &mut self.peer_provider.subscribe().await;
165
166 select_loop! {
167 self.context,
168 on_start => {
169 let _ = self
171 .metrics
172 .fetch_pending
173 .try_set(self.fetcher.len_pending());
174 let _ = self.metrics.fetch_active.try_set(self.fetcher.len_active());
175 let _ = self
176 .metrics
177 .peers_blocked
178 .try_set(self.fetcher.len_blocked());
179 let _ = self.metrics.serve_processing.try_set(self.serves.len());
180
181 let deadline_pending = match self.fetcher.get_pending_deadline() {
183 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
184 None => Either::Right(future::pending()),
185 };
186
187 let deadline_active = match self.fetcher.get_active_deadline() {
189 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
190 None => Either::Right(future::pending()),
191 };
192 },
193 on_stopped => {
194 debug!("shutdown");
195 self.serves.cancel_all();
196 },
197 Some(update) = peer_set_subscription.recv() else {
199 debug!("peer set subscription closed");
200 return;
201 } => {
202 if self.last_peer_set_id < Some(update.index) {
203 self.last_peer_set_id = Some(update.index);
204 self.fetcher.reconcile(update.latest.primary.as_ref());
205 }
206 },
207 _ = deadline_active => {
209 if let Some(key) = self.fetcher.pop_active() {
210 debug!(?key, "requester timeout");
211 self.metrics.fetch.inc(Status::Failure);
212 self.fetcher.add_retry(key);
213 }
214 },
215 _ = deadline_pending => {
217 self.fetcher.fetch(&mut sender).await;
218 },
219 Some(msg) = self.mailbox.recv() else {
221 error!("mailbox closed");
222 return;
223 } => {
224 match msg {
225 Message::Fetch(requests) => {
226 for FetchRequest { key, targets } in requests {
227 trace!(?key, "mailbox: fetch");
228
229 let is_new = !self.fetch_timers.contains_key(&key);
231
232 match targets {
234 Some(targets) => {
235 if is_new || self.fetcher.has_targets(&key) {
239 self.fetcher.add_targets(key.clone(), targets);
240 }
241 }
242 None => self.fetcher.clear_targets(&key),
243 }
244
245 if is_new {
247 self.fetch_timers
248 .insert(key.clone(), self.metrics.fetch_duration.timer());
249 self.fetcher.add_ready(key);
250 } else {
251 trace!(?key, "updated targets for existing fetch");
252 }
253 }
254 }
255 Message::Cancel { key } => {
256 trace!(?key, "mailbox: cancel");
257 let mut guard = self.metrics.cancel.guard(Status::Dropped);
258 if self.fetcher.cancel(&key) {
259 guard.set(Status::Success);
260 self.fetch_timers.remove(&key).unwrap().cancel(); self.consumer.failed(key.clone(), ()).await;
262 }
263 }
264 Message::Retain { predicate } => {
265 trace!("mailbox: retain");
266
267 self.fetcher.retain(&predicate);
269
270 let before = self.fetch_timers.len();
272 let removed = self
273 .fetch_timers
274 .extract_if(|k, _| !predicate(k))
275 .collect::<Vec<_>>();
276 for (key, timer) in removed {
277 timer.cancel();
278 self.consumer.failed(key, ()).await;
279 }
280
281 let removed = (before - self.fetch_timers.len()) as u64;
283 if removed == 0 {
284 self.metrics.cancel.inc(Status::Dropped);
285 } else {
286 self.metrics.cancel.inc_by(Status::Success, removed);
287 }
288 }
289 Message::Clear => {
290 trace!("mailbox: clear");
291
292 self.fetcher.clear();
294
295 let removed = self.fetch_timers.len() as u64;
297 for (key, timer) in self.fetch_timers.drain() {
298 timer.cancel();
299 self.consumer.failed(key, ()).await;
300 }
301
302 if removed == 0 {
304 self.metrics.cancel.inc(Status::Dropped);
305 } else {
306 self.metrics.cancel.inc_by(Status::Success, removed);
307 }
308 }
309 }
310 assert_eq!(self.fetcher.len(), self.fetch_timers.len());
311 },
312 serve = self.serves.next_completed() => {
314 let Serve {
315 timer,
316 peer,
317 id,
318 result,
319 } = serve;
320
321 match result {
323 Ok(_) => {
324 self.metrics.serve.inc(Status::Success);
325 }
326 Err(ref err) => {
327 debug!(?err, ?peer, ?id, "serve failed");
328 timer.cancel();
329 self.metrics.serve.inc(Status::Failure);
330 }
331 }
332
333 self.handle_serve(&mut sender, peer, id, result, self.priority_responses)
335 .await;
336 },
337 msg = receiver.recv() => {
339 let (peer, msg) = match msg {
341 Ok(msg) => msg,
342 Err(err) => {
343 error!(?err, "receiver closed");
344 return;
345 }
346 };
347
348 let msg = match msg {
350 Ok(msg) => msg,
351 Err(err) => {
352 trace!(?err, ?peer, "decode failed");
353 continue;
354 }
355 };
356 match msg.payload {
357 wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key),
358 wire::Payload::Response(response) => {
359 self.handle_network_response(peer, msg.id, response).await
360 }
361 wire::Payload::Error => self.handle_network_error_response(peer, msg.id),
362 };
363 },
364 }
365 }
366
367 async fn handle_serve(
369 &mut self,
370 sender: &mut WrappedSender<NetS, wire::Message<Key>>,
371 peer: P,
372 id: u64,
373 response: Result<Bytes, oneshot::error::RecvError>,
374 priority: bool,
375 ) {
376 let payload: wire::Payload<Key> = response.map_or_else(
378 |_| wire::Payload::Error,
379 |data| wire::Payload::Response(data),
380 );
381 let msg = wire::Message { id, payload };
382
383 let result = sender
385 .send(Recipients::One(peer.clone()), msg, priority)
386 .await;
387
388 match result {
390 Err(err) => error!(?err, ?peer, ?id, "serve send failed"),
391 Ok(to) if to.is_empty() => warn!(?peer, ?id, "serve send failed"),
392 Ok(_) => trace!(?peer, ?id, "serve sent"),
393 };
394 }
395
396 fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
398 trace!(?peer, ?id, "peer request");
400 let mut producer = self.producer.clone();
401 let timer = self.metrics.serve_duration.timer();
402 self.serves.push(async move {
403 let receiver = producer.produce(key).await;
404 let result = receiver.await;
405 Serve {
406 timer,
407 peer,
408 id,
409 result,
410 }
411 });
412 }
413
414 async fn handle_network_response(&mut self, peer: P, id: u64, response: Bytes) {
416 trace!(?peer, ?id, "peer response: data");
417
418 let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
420 return;
422 };
423
424 if self.consumer.deliver(key.clone(), response).await {
426 self.metrics.fetch.inc(Status::Success);
428 self.fetch_timers.remove(&key).unwrap(); self.fetcher.clear_targets(&key);
432 return;
433 }
434
435 commonware_p2p::block!(self.blocker, peer.clone(), "invalid data received");
438 self.fetcher.block(peer);
439 self.metrics.fetch.inc(Status::Failure);
440 self.fetcher.add_retry(key);
441 }
442
443 fn handle_network_error_response(&mut self, peer: P, id: u64) {
445 trace!(?peer, ?id, "peer response: error");
446
447 let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
449 return;
451 };
452
453 self.metrics.fetch.inc(Status::Failure);
455 self.fetcher.add_retry(key);
456 }
457}