1use super::{
2 config::Config,
3 fetcher::Fetcher,
4 ingress::{Mailbox, Message},
5 metrics, wire, Coordinator, Producer,
6};
7use crate::Consumer;
8use bytes::Bytes;
9use commonware_cryptography::PublicKey;
10use commonware_macros::select;
11use commonware_p2p::{
12 utils::codec::{wrap, WrappedSender},
13 Receiver, Recipients, Sender,
14};
15use commonware_runtime::{
16 telemetry::metrics::{
17 histogram,
18 status::{CounterExt, Status},
19 },
20 Clock, Handle, Metrics, Spawner,
21};
22use commonware_utils::{futures::Pool as FuturesPool, Array};
23use futures::{
24 channel::{mpsc, oneshot},
25 future::{self, Either},
26 StreamExt,
27};
28use governor::clock::Clock as GClock;
29use rand::Rng;
30use std::{collections::HashMap, marker::PhantomData};
31use tracing::{debug, error, trace, warn};
32
33struct Serve<E: Clock, P: PublicKey> {
35 timer: histogram::Timer<E>,
36 peer: P,
37 id: u64,
38 result: Result<Bytes, oneshot::Canceled>,
39}
40
41pub struct Engine<
43 E: Clock + GClock + Spawner + Rng + Metrics,
44 P: PublicKey,
45 D: Coordinator<PublicKey = P>,
46 Key: Array,
47 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
48 Pro: Producer<Key = Key>,
49 NetS: Sender<PublicKey = P>,
50 NetR: Receiver<PublicKey = P>,
51> {
52 context: E,
54
55 consumer: Con,
57
58 producer: Pro,
60
61 coordinator: D,
63
64 last_peer_set_id: Option<u64>,
66
67 mailbox: mpsc::Receiver<Message<Key>>,
69
70 fetcher: Fetcher<E, P, Key, NetS>,
72
73 fetch_timers: HashMap<Key, histogram::Timer<E>>,
75
76 serves: FuturesPool<Serve<E, P>>,
81
82 priority_responses: bool,
84
85 metrics: metrics::Metrics<E>,
87
88 _s: PhantomData<NetS>,
90 _r: PhantomData<NetR>,
91}
92
93impl<
94 E: Clock + GClock + Spawner + Rng + Metrics,
95 P: PublicKey,
96 D: Coordinator<PublicKey = P>,
97 Key: Array,
98 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
99 Pro: Producer<Key = Key>,
100 NetS: Sender<PublicKey = P>,
101 NetR: Receiver<PublicKey = P>,
102 > Engine<E, P, D, Key, Con, Pro, NetS, NetR>
103{
104 pub fn new(context: E, cfg: Config<P, D, Key, Con, Pro>) -> (Self, Mailbox<Key>) {
108 let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
109 let metrics = metrics::Metrics::init(context.clone());
110 let fetcher = Fetcher::new(
111 context.with_label("fetcher"),
112 cfg.requester_config,
113 cfg.fetch_retry_timeout,
114 cfg.priority_requests,
115 );
116 (
117 Self {
118 context,
119 consumer: cfg.consumer,
120 producer: cfg.producer,
121 coordinator: cfg.coordinator,
122 last_peer_set_id: None,
123 mailbox: receiver,
124 fetcher,
125 serves: FuturesPool::default(),
126 priority_responses: cfg.priority_responses,
127 metrics,
128 fetch_timers: HashMap::new(),
129 _s: PhantomData,
130 _r: PhantomData,
131 },
132 Mailbox::new(sender),
133 )
134 }
135
136 pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
142 self.context.spawn_ref()(self.run(network))
143 }
144
145 async fn run(mut self, network: (NetS, NetR)) {
147 let mut shutdown = self.context.stopped();
148
149 let (mut sender, mut receiver) = wrap((), network.0, network.1);
151
152 self.last_peer_set_id = Some(self.coordinator.peer_set_id());
154 self.fetcher.reconcile(self.coordinator.peers());
155
156 loop {
157 self.metrics
159 .fetch_pending
160 .set(self.fetcher.len_pending() as i64);
161 self.metrics
162 .fetch_active
163 .set(self.fetcher.len_active() as i64);
164 self.metrics
165 .peers_blocked
166 .set(self.fetcher.len_blocked() as i64);
167 self.metrics.serve_processing.set(self.serves.len() as i64);
168
169 let peer_set_id = self.coordinator.peer_set_id();
171 if self.last_peer_set_id != Some(peer_set_id) {
172 self.last_peer_set_id = Some(peer_set_id);
173 self.fetcher.reconcile(self.coordinator.peers());
174 }
175
176 let deadline_pending = match self.fetcher.get_pending_deadline() {
178 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
179 None => Either::Right(future::pending()),
180 };
181
182 let deadline_active = match self.fetcher.get_active_deadline() {
184 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
185 None => Either::Right(future::pending()),
186 };
187
188 select! {
190 _ = &mut shutdown => {
191 debug!("shutdown");
192 self.serves.cancel_all();
193 return;
194 },
195
196 msg = self.mailbox.next() => {
198 let Some(msg) = msg else {
199 error!("mailbox closed");
200 return;
201 };
202 match msg {
203 Message::Fetch { key } => {
204 trace!(?key, "mailbox: fetch");
205
206 if self.fetch_timers.contains_key(&key) {
208 trace!(?key, "duplicate fetch");
209 self.metrics.fetch.inc(Status::Dropped);
210 continue;
211 }
212
213 self.fetch_timers.insert(key.clone(), self.metrics.fetch_duration.timer());
215 self.fetcher.fetch(&mut sender, key, true).await;
216 }
217 Message::Cancel { key } => {
218 trace!(?key, "mailbox: cancel");
219 let mut guard = self.metrics.cancel.guard(Status::Dropped);
220 if self.fetcher.cancel(&key) {
221 guard.set(Status::Success);
222 self.fetch_timers.remove(&key).unwrap().cancel(); self.consumer.failed(key.clone(), ()).await;
224 }
225 }
226 }
227 },
228
229 serve = self.serves.next_completed() => {
231 let Serve { timer, peer, id, result } = serve;
232
233 match result {
235 Ok(_) => {
236 self.metrics.serve.inc(Status::Success);
237 }
238 Err(err) => {
239 debug!(?err, ?peer, ?id, "serve failed");
240 timer.cancel();
241 self.metrics.serve.inc(Status::Failure);
242 }
243 }
244
245 self.handle_serve(&mut sender, peer, id, result, self.priority_responses).await;
247 },
248
249 msg = receiver.recv() => {
251 let (peer, msg) = match msg {
253 Ok(msg) => msg,
254 Err(err) => {
255 error!(?err, "receiver closed");
256 return;
257 }
258 };
259
260 let msg = match msg {
262 Ok(msg) => msg,
263 Err(err) => {
264 trace!(?err, ?peer, "decode failed");
265 continue;
266 }
267 };
268 match msg.payload {
269 wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key).await,
270 wire::Payload::Response(response) => self.handle_network_response(&mut sender, peer, msg.id, response).await,
271 wire::Payload::ErrorResponse => self.handle_network_error_response(&mut sender, peer, msg.id).await,
272 };
273 },
274
275 _ = deadline_pending => {
277 let key = self.fetcher.pop_pending();
278 debug!(?key, "retrying");
279 self.metrics.fetch.inc(Status::Failure);
280 self.fetcher.fetch(&mut sender, key, false).await;
281 },
282
283 _ = deadline_active => {
285 if let Some(key) = self.fetcher.pop_active() {
286 debug!(?key, "requester timeout");
287 self.metrics.fetch.inc(Status::Failure);
288 self.fetcher.fetch(&mut sender, key, false).await;
289 }
290 },
291 }
292 }
293 }
294
295 async fn handle_serve(
297 &mut self,
298 sender: &mut WrappedSender<NetS, wire::Message<Key>>,
299 peer: P,
300 id: u64,
301 response: Result<Bytes, oneshot::Canceled>,
302 priority: bool,
303 ) {
304 let payload: wire::Payload<Key> = match response {
306 Ok(data) => wire::Payload::Response(data),
307 Err(_) => wire::Payload::ErrorResponse,
308 };
309 let msg = wire::Message { id, payload };
310
311 let result = sender
313 .send(Recipients::One(peer.clone()), msg, priority)
314 .await;
315
316 match result {
318 Err(err) => error!(?err, ?peer, ?id, "serve send failed"),
319 Ok(to) if to.is_empty() => warn!(?peer, ?id, "serve send failed"),
320 Ok(_) => trace!(?peer, ?id, "serve sent"),
321 };
322 }
323
324 async fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
326 trace!(?peer, ?id, "peer request");
328 let mut producer = self.producer.clone();
329 let timer = self.metrics.serve_duration.timer();
330 self.serves.push(async move {
331 let receiver = producer.produce(key).await;
332 let result = receiver.await;
333 Serve {
334 timer,
335 peer,
336 id,
337 result,
338 }
339 });
340 }
341
342 async fn handle_network_response(
344 &mut self,
345 sender: &mut WrappedSender<NetS, wire::Message<Key>>,
346 peer: P,
347 id: u64,
348 response: Bytes,
349 ) {
350 trace!(?peer, ?id, "peer response: data");
351
352 let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
354 return;
356 };
357
358 if self.consumer.deliver(key.clone(), response).await {
360 self.metrics.fetch.inc(Status::Success);
362 self.fetch_timers.remove(&key).unwrap(); return;
364 }
365
366 self.fetcher.block(peer);
368 self.metrics.fetch.inc(Status::Failure);
369 self.fetcher.fetch(sender, key, false).await;
370 }
371
372 async fn handle_network_error_response(
374 &mut self,
375 sender: &mut WrappedSender<NetS, wire::Message<Key>>,
376 peer: P,
377 id: u64,
378 ) {
379 trace!(?peer, ?id, "peer response: error");
380
381 let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
383 return;
385 };
386
387 self.metrics.fetch.inc(Status::Failure);
389 self.fetcher.fetch(sender, key, false).await;
391 }
392}