1use super::{
2 config::Config,
3 fetcher::{Config as FetcherConfig, Fetcher},
4 ingress::{FetchRequest, Mailbox, Message},
5 metrics, wire, Producer,
6};
7use crate::Consumer;
8use bytes::Bytes;
9use commonware_cryptography::PublicKey;
10use commonware_macros::select_loop;
11use commonware_p2p::{
12 utils::codec::{wrap, WrappedSender},
13 Blocker, Provider, Receiver, Recipients, Sender,
14};
15use commonware_runtime::{
16 spawn_cell,
17 telemetry::metrics::{
18 histogram,
19 status::{CounterExt, GaugeExt, Status},
20 },
21 Clock, ContextCell, Handle, Metrics, Spawner,
22};
23use commonware_utils::{
24 channel::{mpsc, oneshot},
25 futures::Pool as FuturesPool,
26 Span,
27};
28use futures::future::{self, Either};
29use rand::Rng;
30use std::{collections::HashMap, marker::PhantomData};
31use tracing::{debug, error, trace, warn};
32
33struct Serve<E: Clock, P: PublicKey> {
35 timer: histogram::Timer<E>,
36 peer: P,
37 id: u64,
38 result: Result<Bytes, oneshot::error::RecvError>,
39}
40
41pub struct Engine<
43 E: Clock + Spawner + Rng + Metrics,
44 P: PublicKey,
45 D: Provider<PublicKey = P>,
46 B: Blocker<PublicKey = P>,
47 Key: Span,
48 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
49 Pro: Producer<Key = Key>,
50 NetS: Sender<PublicKey = P>,
51 NetR: Receiver<PublicKey = P>,
52> {
53 context: ContextCell<E>,
55
56 consumer: Con,
58
59 producer: Pro,
61
62 provider: D,
64
65 blocker: B,
67
68 last_peer_set_id: Option<u64>,
70
71 mailbox: mpsc::Receiver<Message<Key, P>>,
73
74 fetcher: Fetcher<E, P, Key, NetS>,
76
77 fetch_timers: HashMap<Key, histogram::Timer<E>>,
79
80 serves: FuturesPool<Serve<E, P>>,
85
86 priority_responses: bool,
88
89 metrics: metrics::Metrics<E>,
91
92 _r: PhantomData<NetR>,
94}
95
96impl<
97 E: Clock + Spawner + Rng + Metrics,
98 P: PublicKey,
99 D: Provider<PublicKey = P>,
100 B: Blocker<PublicKey = P>,
101 Key: Span,
102 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
103 Pro: Producer<Key = Key>,
104 NetS: Sender<PublicKey = P>,
105 NetR: Receiver<PublicKey = P>,
106 > Engine<E, P, D, B, Key, Con, Pro, NetS, NetR>
107{
108 pub fn new(context: E, cfg: Config<P, D, B, Key, Con, Pro>) -> (Self, Mailbox<Key, P>) {
112 let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
113
114 let metrics = metrics::Metrics::init(context.clone());
116 let fetcher = Fetcher::new(
117 context.with_label("fetcher"),
118 FetcherConfig {
119 me: cfg.me,
120 initial: cfg.initial,
121 timeout: cfg.timeout,
122 retry_timeout: cfg.fetch_retry_timeout,
123 priority_requests: cfg.priority_requests,
124 },
125 );
126 (
127 Self {
128 context: ContextCell::new(context),
129 consumer: cfg.consumer,
130 producer: cfg.producer,
131 provider: cfg.provider,
132 blocker: cfg.blocker,
133 last_peer_set_id: None,
134 mailbox: receiver,
135 fetcher,
136 serves: FuturesPool::default(),
137 priority_responses: cfg.priority_responses,
138 metrics,
139 fetch_timers: HashMap::new(),
140 _r: PhantomData,
141 },
142 Mailbox::new(sender),
143 )
144 }
145
146 pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
152 spawn_cell!(self.context, self.run(network).await)
153 }
154
155 async fn run(mut self, network: (NetS, NetR)) {
157 let peer_set_subscription = &mut self.provider.subscribe().await;
158
159 let (mut sender, mut receiver) = wrap((), network.0, network.1);
161
162 select_loop! {
163 self.context,
164 on_start => {
165 let _ = self
167 .metrics
168 .fetch_pending
169 .try_set(self.fetcher.len_pending());
170 let _ = self.metrics.fetch_active.try_set(self.fetcher.len_active());
171 let _ = self
172 .metrics
173 .peers_blocked
174 .try_set(self.fetcher.len_blocked());
175 let _ = self.metrics.serve_processing.try_set(self.serves.len());
176
177 let deadline_pending = match self.fetcher.get_pending_deadline() {
179 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
180 None => Either::Right(future::pending()),
181 };
182
183 let deadline_active = match self.fetcher.get_active_deadline() {
185 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
186 None => Either::Right(future::pending()),
187 };
188 },
189 on_stopped => {
190 debug!("shutdown");
191 self.serves.cancel_all();
192 },
193 Some((id, _, all)) = peer_set_subscription.recv() else {
195 debug!("peer set subscription closed");
196 return;
197 } => {
198 if self.last_peer_set_id < Some(id) {
201 self.last_peer_set_id = Some(id);
202 self.fetcher.reconcile(all.as_ref());
203 }
204 },
205 _ = deadline_active => {
207 if let Some(key) = self.fetcher.pop_active() {
208 debug!(?key, "requester timeout");
209 self.metrics.fetch.inc(Status::Failure);
210 self.fetcher.add_retry(key);
211 }
212 },
213 _ = deadline_pending => {
215 self.fetcher.fetch(&mut sender).await;
216 },
217 Some(msg) = self.mailbox.recv() else {
219 error!("mailbox closed");
220 return;
221 } => {
222 match msg {
223 Message::Fetch(requests) => {
224 for FetchRequest { key, targets } in requests {
225 trace!(?key, "mailbox: fetch");
226
227 let is_new = !self.fetch_timers.contains_key(&key);
229
230 match targets {
232 Some(targets) => {
233 if is_new || self.fetcher.has_targets(&key) {
237 self.fetcher.add_targets(key.clone(), targets);
238 }
239 }
240 None => self.fetcher.clear_targets(&key),
241 }
242
243 if is_new {
245 self.fetch_timers
246 .insert(key.clone(), self.metrics.fetch_duration.timer());
247 self.fetcher.add_ready(key);
248 } else {
249 trace!(?key, "updated targets for existing fetch");
250 }
251 }
252 }
253 Message::Cancel { key } => {
254 trace!(?key, "mailbox: cancel");
255 let mut guard = self.metrics.cancel.guard(Status::Dropped);
256 if self.fetcher.cancel(&key) {
257 guard.set(Status::Success);
258 self.fetch_timers.remove(&key).unwrap().cancel(); self.consumer.failed(key.clone(), ()).await;
260 }
261 }
262 Message::Retain { predicate } => {
263 trace!("mailbox: retain");
264
265 self.fetcher.retain(&predicate);
267
268 let before = self.fetch_timers.len();
270 let removed = self
271 .fetch_timers
272 .extract_if(|k, _| !predicate(k))
273 .collect::<Vec<_>>();
274 for (key, timer) in removed {
275 timer.cancel();
276 self.consumer.failed(key, ()).await;
277 }
278
279 let removed = (before - self.fetch_timers.len()) as u64;
281 if removed == 0 {
282 self.metrics.cancel.inc(Status::Dropped);
283 } else {
284 self.metrics.cancel.inc_by(Status::Success, removed);
285 }
286 }
287 Message::Clear => {
288 trace!("mailbox: clear");
289
290 self.fetcher.clear();
292
293 let removed = self.fetch_timers.len() as u64;
295 for (key, timer) in self.fetch_timers.drain() {
296 timer.cancel();
297 self.consumer.failed(key, ()).await;
298 }
299
300 if removed == 0 {
302 self.metrics.cancel.inc(Status::Dropped);
303 } else {
304 self.metrics.cancel.inc_by(Status::Success, removed);
305 }
306 }
307 }
308 assert_eq!(self.fetcher.len(), self.fetch_timers.len());
309 },
310 serve = self.serves.next_completed() => {
312 let Serve {
313 timer,
314 peer,
315 id,
316 result,
317 } = serve;
318
319 match result {
321 Ok(_) => {
322 self.metrics.serve.inc(Status::Success);
323 }
324 Err(ref err) => {
325 debug!(?err, ?peer, ?id, "serve failed");
326 timer.cancel();
327 self.metrics.serve.inc(Status::Failure);
328 }
329 }
330
331 self.handle_serve(&mut sender, peer, id, result, self.priority_responses)
333 .await;
334 },
335 msg = receiver.recv() => {
337 let (peer, msg) = match msg {
339 Ok(msg) => msg,
340 Err(err) => {
341 error!(?err, "receiver closed");
342 return;
343 }
344 };
345
346 let msg = match msg {
348 Ok(msg) => msg,
349 Err(err) => {
350 trace!(?err, ?peer, "decode failed");
351 continue;
352 }
353 };
354 match msg.payload {
355 wire::Payload::Request(key) => {
356 self.handle_network_request(peer, msg.id, key).await
357 }
358 wire::Payload::Response(response) => {
359 self.handle_network_response(peer, msg.id, response).await
360 }
361 wire::Payload::Error => self.handle_network_error_response(peer, msg.id).await,
362 };
363 },
364 }
365 }
366
367 async fn handle_serve(
369 &mut self,
370 sender: &mut WrappedSender<NetS, wire::Message<Key>>,
371 peer: P,
372 id: u64,
373 response: Result<Bytes, oneshot::error::RecvError>,
374 priority: bool,
375 ) {
376 let payload: wire::Payload<Key> = response.map_or_else(
378 |_| wire::Payload::Error,
379 |data| wire::Payload::Response(data),
380 );
381 let msg = wire::Message { id, payload };
382
383 let result = sender
385 .send(Recipients::One(peer.clone()), msg, priority)
386 .await;
387
388 match result {
390 Err(err) => error!(?err, ?peer, ?id, "serve send failed"),
391 Ok(to) if to.is_empty() => warn!(?peer, ?id, "serve send failed"),
392 Ok(_) => trace!(?peer, ?id, "serve sent"),
393 };
394 }
395
396 async fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
398 trace!(?peer, ?id, "peer request");
400 let mut producer = self.producer.clone();
401 let timer = self.metrics.serve_duration.timer();
402 self.serves.push(async move {
403 let receiver = producer.produce(key).await;
404 let result = receiver.await;
405 Serve {
406 timer,
407 peer,
408 id,
409 result,
410 }
411 });
412 }
413
414 async fn handle_network_response(&mut self, peer: P, id: u64, response: Bytes) {
416 trace!(?peer, ?id, "peer response: data");
417
418 let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
420 return;
422 };
423
424 if self.consumer.deliver(key.clone(), response).await {
426 self.metrics.fetch.inc(Status::Success);
428 self.fetch_timers.remove(&key).unwrap(); self.fetcher.clear_targets(&key);
432 return;
433 }
434
435 self.blocker.block(peer.clone()).await;
438 self.fetcher.block(peer);
439 self.metrics.fetch.inc(Status::Failure);
440 self.fetcher.add_retry(key);
441 }
442
443 async fn handle_network_error_response(&mut self, peer: P, id: u64) {
445 trace!(?peer, ?id, "peer response: error");
446
447 let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
449 return;
451 };
452
453 self.metrics.fetch.inc(Status::Failure);
455 self.fetcher.add_retry(key);
456 }
457}