1use super::{
2 config::Config,
3 fetcher::Fetcher,
4 ingress::{Mailbox, Message},
5 metrics, wire, Producer,
6};
7use crate::Consumer;
8use bytes::Bytes;
9use commonware_cryptography::PublicKey;
10use commonware_macros::select;
11use commonware_p2p::{
12 utils::codec::{wrap, WrappedSender},
13 Manager, Receiver, Recipients, Sender,
14};
15use commonware_runtime::{
16 spawn_cell,
17 telemetry::metrics::{
18 histogram,
19 status::{CounterExt, Status},
20 },
21 Clock, ContextCell, Handle, Metrics, Spawner,
22};
23use commonware_utils::{futures::Pool as FuturesPool, Span};
24use futures::{
25 channel::{mpsc, oneshot},
26 future::{self, Either},
27 StreamExt,
28};
29use governor::clock::Clock as GClock;
30use rand::Rng;
31use std::{collections::HashMap, marker::PhantomData};
32use tracing::{debug, error, trace, warn};
33
34struct Serve<E: Clock, P: PublicKey> {
36 timer: histogram::Timer<E>,
37 peer: P,
38 id: u64,
39 result: Result<Bytes, oneshot::Canceled>,
40}
41
42pub struct Engine<
44 E: Clock + GClock + Spawner + Rng + Metrics,
45 P: PublicKey,
46 D: Manager<PublicKey = P>,
47 Key: Span,
48 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
49 Pro: Producer<Key = Key>,
50 NetS: Sender<PublicKey = P>,
51 NetR: Receiver<PublicKey = P>,
52> {
53 context: ContextCell<E>,
55
56 consumer: Con,
58
59 producer: Pro,
61
62 manager: D,
64
65 last_peer_set_id: Option<u64>,
67
68 mailbox: mpsc::Receiver<Message<Key>>,
70
71 fetcher: Fetcher<E, P, Key, NetS>,
73
74 fetch_timers: HashMap<Key, histogram::Timer<E>>,
76
77 serves: FuturesPool<Serve<E, P>>,
82
83 priority_responses: bool,
85
86 metrics: metrics::Metrics<E>,
88
89 _s: PhantomData<NetS>,
91 _r: PhantomData<NetR>,
92}
93
94impl<
95 E: Clock + GClock + Spawner + Rng + Metrics,
96 P: PublicKey,
97 D: Manager<PublicKey = P>,
98 Key: Span,
99 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
100 Pro: Producer<Key = Key>,
101 NetS: Sender<PublicKey = P>,
102 NetR: Receiver<PublicKey = P>,
103 > Engine<E, P, D, Key, Con, Pro, NetS, NetR>
104{
105 pub fn new(context: E, cfg: Config<P, D, Key, Con, Pro>) -> (Self, Mailbox<Key>) {
109 let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
110
111 let metrics = metrics::Metrics::init(context.clone());
113 let fetcher = Fetcher::new(
114 context.with_label("fetcher"),
115 cfg.requester_config,
116 cfg.fetch_retry_timeout,
117 cfg.priority_requests,
118 );
119 (
120 Self {
121 context: ContextCell::new(context),
122 consumer: cfg.consumer,
123 producer: cfg.producer,
124 manager: cfg.manager,
125 last_peer_set_id: None,
126 mailbox: receiver,
127 fetcher,
128 serves: FuturesPool::default(),
129 priority_responses: cfg.priority_responses,
130 metrics,
131 fetch_timers: HashMap::new(),
132 _s: PhantomData,
133 _r: PhantomData,
134 },
135 Mailbox::new(sender),
136 )
137 }
138
139 pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
145 spawn_cell!(self.context, self.run(network).await)
146 }
147
148 async fn run(mut self, network: (NetS, NetR)) {
150 let mut shutdown = self.context.stopped();
151 let peer_set_subscription = &mut self.manager.subscribe().await;
152
153 let (mut sender, mut receiver) = wrap((), network.0, network.1);
155
156 loop {
157 self.metrics
159 .fetch_pending
160 .set(self.fetcher.len_pending() as i64);
161 self.metrics
162 .fetch_active
163 .set(self.fetcher.len_active() as i64);
164 self.metrics
165 .peers_blocked
166 .set(self.fetcher.len_blocked() as i64);
167 self.metrics.serve_processing.set(self.serves.len() as i64);
168
169 let deadline_pending = match self.fetcher.get_pending_deadline() {
171 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
172 None => Either::Right(future::pending()),
173 };
174
175 let deadline_active = match self.fetcher.get_active_deadline() {
177 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
178 None => Either::Right(future::pending()),
179 };
180
181 select! {
183 _ = &mut shutdown => {
184 debug!("shutdown");
185 self.serves.cancel_all();
186 return;
187 },
188
189 peer_set_update = peer_set_subscription.next() => {
191 let Some((id, _, all)) = peer_set_update else {
192 debug!("peer set subscription closed");
193 return;
194 };
195
196 if self.last_peer_set_id < Some(id) {
199 self.last_peer_set_id = Some(id);
200 self.fetcher.reconcile(all.as_ref());
201 }
202 },
203
204 msg = self.mailbox.next() => {
206 let Some(msg) = msg else {
207 error!("mailbox closed");
208 return;
209 };
210 match msg {
211 Message::Fetch { key } => {
212 trace!(?key, "mailbox: fetch");
213
214 if self.fetch_timers.contains_key(&key) {
216 trace!(?key, "duplicate fetch");
217 self.metrics.fetch.inc(Status::Dropped);
218 continue;
219 }
220
221 self.fetch_timers.insert(key.clone(), self.metrics.fetch_duration.timer());
223 self.fetcher.fetch(&mut sender, key, true).await;
224 }
225 Message::Cancel { key } => {
226 trace!(?key, "mailbox: cancel");
227 let mut guard = self.metrics.cancel.guard(Status::Dropped);
228 if self.fetcher.cancel(&key) {
229 guard.set(Status::Success);
230 self.fetch_timers.remove(&key).unwrap().cancel(); self.consumer.failed(key.clone(), ()).await;
232 }
233 }
234 Message::Retain { predicate } => {
235 trace!("mailbox: retain");
236 let before = self.fetcher.len();
237 self.fetcher.retain(predicate);
238 let after = self.fetcher.len();
239 if before == after {
240 self.metrics.cancel.inc(Status::Dropped);
241 } else {
242 self.metrics.cancel.inc_by(Status::Success, before.checked_sub(after).unwrap() as u64);
243 }
244 }
245 Message::Clear => {
246 trace!("mailbox: clear");
247 let before = self.fetcher.len();
248 self.fetcher.clear();
249 let after = self.fetcher.len();
250 if before == after {
251 self.metrics.cancel.inc(Status::Dropped);
252 } else {
253 self.metrics.cancel.inc_by(Status::Success, before.checked_sub(after).unwrap() as u64);
254 }
255 }
256 }
257 },
258
259 serve = self.serves.next_completed() => {
261 let Serve { timer, peer, id, result } = serve;
262
263 match result {
265 Ok(_) => {
266 self.metrics.serve.inc(Status::Success);
267 }
268 Err(err) => {
269 debug!(?err, ?peer, ?id, "serve failed");
270 timer.cancel();
271 self.metrics.serve.inc(Status::Failure);
272 }
273 }
274
275 self.handle_serve(&mut sender, peer, id, result, self.priority_responses).await;
277 },
278
279 msg = receiver.recv() => {
281 let (peer, msg) = match msg {
283 Ok(msg) => msg,
284 Err(err) => {
285 error!(?err, "receiver closed");
286 return;
287 }
288 };
289
290 let msg = match msg {
292 Ok(msg) => msg,
293 Err(err) => {
294 trace!(?err, ?peer, "decode failed");
295 continue;
296 }
297 };
298 match msg.payload {
299 wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key).await,
300 wire::Payload::Response(response) => self.handle_network_response(&mut sender, peer, msg.id, response).await,
301 wire::Payload::ErrorResponse => self.handle_network_error_response(&mut sender, peer, msg.id).await,
302 };
303 },
304
305 _ = deadline_pending => {
307 let key = self.fetcher.pop_pending();
308 debug!(?key, "retrying");
309 self.metrics.fetch.inc(Status::Failure);
310 self.fetcher.fetch(&mut sender, key, false).await;
311 },
312
313 _ = deadline_active => {
315 if let Some(key) = self.fetcher.pop_active() {
316 debug!(?key, "requester timeout");
317 self.metrics.fetch.inc(Status::Failure);
318 self.fetcher.fetch(&mut sender, key, false).await;
319 }
320 },
321 }
322 }
323 }
324
325 async fn handle_serve(
327 &mut self,
328 sender: &mut WrappedSender<NetS, wire::Message<Key>>,
329 peer: P,
330 id: u64,
331 response: Result<Bytes, oneshot::Canceled>,
332 priority: bool,
333 ) {
334 let payload: wire::Payload<Key> = match response {
336 Ok(data) => wire::Payload::Response(data),
337 Err(_) => wire::Payload::ErrorResponse,
338 };
339 let msg = wire::Message { id, payload };
340
341 let result = sender
343 .send(Recipients::One(peer.clone()), msg, priority)
344 .await;
345
346 match result {
348 Err(err) => error!(?err, ?peer, ?id, "serve send failed"),
349 Ok(to) if to.is_empty() => warn!(?peer, ?id, "serve send failed"),
350 Ok(_) => trace!(?peer, ?id, "serve sent"),
351 };
352 }
353
354 async fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
356 trace!(?peer, ?id, "peer request");
358 let mut producer = self.producer.clone();
359 let timer = self.metrics.serve_duration.timer();
360 self.serves.push(async move {
361 let receiver = producer.produce(key).await;
362 let result = receiver.await;
363 Serve {
364 timer,
365 peer,
366 id,
367 result,
368 }
369 });
370 }
371
372 async fn handle_network_response(
374 &mut self,
375 sender: &mut WrappedSender<NetS, wire::Message<Key>>,
376 peer: P,
377 id: u64,
378 response: Bytes,
379 ) {
380 trace!(?peer, ?id, "peer response: data");
381
382 let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
384 return;
386 };
387
388 if self.consumer.deliver(key.clone(), response).await {
390 self.metrics.fetch.inc(Status::Success);
392 self.fetch_timers.remove(&key).unwrap(); return;
394 }
395
396 self.fetcher.block(peer);
398 self.metrics.fetch.inc(Status::Failure);
399 self.fetcher.fetch(sender, key, false).await;
400 }
401
402 async fn handle_network_error_response(
404 &mut self,
405 sender: &mut WrappedSender<NetS, wire::Message<Key>>,
406 peer: P,
407 id: u64,
408 ) {
409 trace!(?peer, ?id, "peer response: error");
410
411 let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
413 return;
415 };
416
417 self.metrics.fetch.inc(Status::Failure);
419 self.fetcher.fetch(sender, key, false).await;
421 }
422}