Skip to main content

commonware_resolver/p2p/
engine.rs

1use super::{
2    config::Config,
3    fetcher::{Config as FetcherConfig, Fetcher},
4    ingress::{FetchRequest, Mailbox, Message},
5    metrics, wire, Producer,
6};
7use crate::Consumer;
8use bytes::Bytes;
9use commonware_cryptography::PublicKey;
10use commonware_macros::select_loop;
11use commonware_p2p::{
12    utils::codec::{wrap, WrappedSender},
13    Blocker, Provider, Receiver, Recipients, Sender,
14};
15use commonware_runtime::{
16    spawn_cell,
17    telemetry::metrics::{
18        histogram,
19        status::{CounterExt, GaugeExt, Status},
20    },
21    Clock, ContextCell, Handle, Metrics, Spawner,
22};
23use commonware_utils::{
24    channel::{mpsc, oneshot},
25    futures::Pool as FuturesPool,
26    Span,
27};
28use futures::future::{self, Either};
29use rand::Rng;
30use std::{collections::HashMap, marker::PhantomData};
31use tracing::{debug, error, trace, warn};
32
33/// Represents a pending serve operation.
34struct Serve<E: Clock, P: PublicKey> {
35    timer: histogram::Timer<E>,
36    peer: P,
37    id: u64,
38    result: Result<Bytes, oneshot::error::RecvError>,
39}
40
41/// Manages incoming and outgoing P2P requests, coordinating fetch and serve operations.
42pub struct Engine<
43    E: Clock + Spawner + Rng + Metrics,
44    P: PublicKey,
45    D: Provider<PublicKey = P>,
46    B: Blocker<PublicKey = P>,
47    Key: Span,
48    Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
49    Pro: Producer<Key = Key>,
50    NetS: Sender<PublicKey = P>,
51    NetR: Receiver<PublicKey = P>,
52> {
53    /// Context used to spawn tasks, manage time, etc.
54    context: ContextCell<E>,
55
56    /// Consumes data that is fetched from the network
57    consumer: Con,
58
59    /// Produces data for incoming requests
60    producer: Pro,
61
62    /// Manages the list of peers that can be used to fetch data
63    provider: D,
64
65    /// The blocker that will be used to block peers that send invalid responses
66    blocker: B,
67
68    /// Used to detect changes in the peer set
69    last_peer_set_id: Option<u64>,
70
71    /// Mailbox that makes and cancels fetch requests
72    mailbox: mpsc::Receiver<Message<Key, P>>,
73
74    /// Manages outgoing fetch requests
75    fetcher: Fetcher<E, P, Key, NetS>,
76
77    /// Track the start time of fetch operations
78    fetch_timers: HashMap<Key, histogram::Timer<E>>,
79
80    /// Holds futures that resolve once the `Producer` has produced the data.
81    /// Once the future is resolved, the data (or an error) is sent to the peer.
82    /// Has unbounded size; the number of concurrent requests should be limited
83    /// by the `Producer` which may drop requests.
84    serves: FuturesPool<Serve<E, P>>,
85
86    /// Whether responses are sent with priority over other network messages
87    priority_responses: bool,
88
89    /// Metrics for the peer actor
90    metrics: metrics::Metrics<E>,
91
92    /// Phantom data for networking types
93    _r: PhantomData<NetR>,
94}
95
96impl<
97        E: Clock + Spawner + Rng + Metrics,
98        P: PublicKey,
99        D: Provider<PublicKey = P>,
100        B: Blocker<PublicKey = P>,
101        Key: Span,
102        Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
103        Pro: Producer<Key = Key>,
104        NetS: Sender<PublicKey = P>,
105        NetR: Receiver<PublicKey = P>,
106    > Engine<E, P, D, B, Key, Con, Pro, NetS, NetR>
107{
108    /// Creates a new `Actor` with the given configuration.
109    ///
110    /// Returns the actor and a mailbox to send messages to it.
111    pub fn new(context: E, cfg: Config<P, D, B, Key, Con, Pro>) -> (Self, Mailbox<Key, P>) {
112        let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
113
114        // TODO(#1833): Metrics should use the post-start context
115        let metrics = metrics::Metrics::init(context.clone());
116        let fetcher = Fetcher::new(
117            context.with_label("fetcher"),
118            FetcherConfig {
119                me: cfg.me,
120                initial: cfg.initial,
121                timeout: cfg.timeout,
122                retry_timeout: cfg.fetch_retry_timeout,
123                priority_requests: cfg.priority_requests,
124            },
125        );
126        (
127            Self {
128                context: ContextCell::new(context),
129                consumer: cfg.consumer,
130                producer: cfg.producer,
131                provider: cfg.provider,
132                blocker: cfg.blocker,
133                last_peer_set_id: None,
134                mailbox: receiver,
135                fetcher,
136                serves: FuturesPool::default(),
137                priority_responses: cfg.priority_responses,
138                metrics,
139                fetch_timers: HashMap::new(),
140                _r: PhantomData,
141            },
142            Mailbox::new(sender),
143        )
144    }
145
146    /// Runs the actor until the context is stopped.
147    ///
148    /// The actor will handle:
149    /// - Fetching data from other peers and notifying the `Consumer`
150    /// - Serving data to other peers by requesting it from the `Producer`
151    pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
152        spawn_cell!(self.context, self.run(network).await)
153    }
154
155    /// Inner run loop called by `start`.
156    async fn run(mut self, network: (NetS, NetR)) {
157        let peer_set_subscription = &mut self.provider.subscribe().await;
158
159        // Wrap channel
160        let (mut sender, mut receiver) = wrap((), network.0, network.1);
161
162        select_loop! {
163            self.context,
164            on_start => {
165                // Update metrics
166                let _ = self
167                    .metrics
168                    .fetch_pending
169                    .try_set(self.fetcher.len_pending());
170                let _ = self.metrics.fetch_active.try_set(self.fetcher.len_active());
171                let _ = self
172                    .metrics
173                    .peers_blocked
174                    .try_set(self.fetcher.len_blocked());
175                let _ = self.metrics.serve_processing.try_set(self.serves.len());
176
177                // Get retry timeout (if any)
178                let deadline_pending = match self.fetcher.get_pending_deadline() {
179                    Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
180                    None => Either::Right(future::pending()),
181                };
182
183                // Get requester timeout (if any)
184                let deadline_active = match self.fetcher.get_active_deadline() {
185                    Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
186                    None => Either::Right(future::pending()),
187                };
188            },
189            on_stopped => {
190                debug!("shutdown");
191                self.serves.cancel_all();
192            },
193            // Handle peer set updates
194            Some((id, _, all)) = peer_set_subscription.recv() else {
195                debug!("peer set subscription closed");
196                return;
197            } => {
198                // Instead of directing our requests to exclusively the latest set (which may still be syncing, we
199                // reconcile with all tracked peers).
200                if self.last_peer_set_id < Some(id) {
201                    self.last_peer_set_id = Some(id);
202                    self.fetcher.reconcile(all.as_ref());
203                }
204            },
205            // Handle active deadline
206            _ = deadline_active => {
207                if let Some(key) = self.fetcher.pop_active() {
208                    debug!(?key, "requester timeout");
209                    self.metrics.fetch.inc(Status::Failure);
210                    self.fetcher.add_retry(key);
211                }
212            },
213            // Handle pending deadline
214            _ = deadline_pending => {
215                self.fetcher.fetch(&mut sender).await;
216            },
217            // Handle mailbox messages
218            Some(msg) = self.mailbox.recv() else {
219                error!("mailbox closed");
220                return;
221            } => {
222                match msg {
223                    Message::Fetch(requests) => {
224                        for FetchRequest { key, targets } in requests {
225                            trace!(?key, "mailbox: fetch");
226
227                            // Check if the fetch is already in progress
228                            let is_new = !self.fetch_timers.contains_key(&key);
229
230                            // Update targets
231                            match targets {
232                                Some(targets) => {
233                                    // Only add targets if this is a new fetch OR the existing
234                                    // fetch already has targets. Don't restrict an "all" fetch
235                                    // (no targets) to specific targets.
236                                    if is_new || self.fetcher.has_targets(&key) {
237                                        self.fetcher.add_targets(key.clone(), targets);
238                                    }
239                                }
240                                None => self.fetcher.clear_targets(&key),
241                            }
242
243                            // Only start new fetch if not already in progress
244                            if is_new {
245                                self.fetch_timers
246                                    .insert(key.clone(), self.metrics.fetch_duration.timer());
247                                self.fetcher.add_ready(key);
248                            } else {
249                                trace!(?key, "updated targets for existing fetch");
250                            }
251                        }
252                    }
253                    Message::Cancel { key } => {
254                        trace!(?key, "mailbox: cancel");
255                        let mut guard = self.metrics.cancel.guard(Status::Dropped);
256                        if self.fetcher.cancel(&key) {
257                            guard.set(Status::Success);
258                            self.fetch_timers.remove(&key).unwrap().cancel(); // must exist, don't record metric
259                            self.consumer.failed(key.clone(), ()).await;
260                        }
261                    }
262                    Message::Retain { predicate } => {
263                        trace!("mailbox: retain");
264
265                        // Remove from fetcher
266                        self.fetcher.retain(&predicate);
267
268                        // Clean up timers and notify consumer
269                        let before = self.fetch_timers.len();
270                        let removed = self
271                            .fetch_timers
272                            .extract_if(|k, _| !predicate(k))
273                            .collect::<Vec<_>>();
274                        for (key, timer) in removed {
275                            timer.cancel();
276                            self.consumer.failed(key, ()).await;
277                        }
278
279                        // Metrics
280                        let removed = (before - self.fetch_timers.len()) as u64;
281                        if removed == 0 {
282                            self.metrics.cancel.inc(Status::Dropped);
283                        } else {
284                            self.metrics.cancel.inc_by(Status::Success, removed);
285                        }
286                    }
287                    Message::Clear => {
288                        trace!("mailbox: clear");
289
290                        // Clear fetcher
291                        self.fetcher.clear();
292
293                        // Drain timers and notify consumer
294                        let removed = self.fetch_timers.len() as u64;
295                        for (key, timer) in self.fetch_timers.drain() {
296                            timer.cancel();
297                            self.consumer.failed(key, ()).await;
298                        }
299
300                        // Metrics
301                        if removed == 0 {
302                            self.metrics.cancel.inc(Status::Dropped);
303                        } else {
304                            self.metrics.cancel.inc_by(Status::Success, removed);
305                        }
306                    }
307                }
308                assert_eq!(self.fetcher.len(), self.fetch_timers.len());
309            },
310            // Handle completed server requests
311            serve = self.serves.next_completed() => {
312                let Serve {
313                    timer,
314                    peer,
315                    id,
316                    result,
317                } = serve;
318
319                // Metrics and logs
320                match result {
321                    Ok(_) => {
322                        self.metrics.serve.inc(Status::Success);
323                    }
324                    Err(ref err) => {
325                        debug!(?err, ?peer, ?id, "serve failed");
326                        timer.cancel();
327                        self.metrics.serve.inc(Status::Failure);
328                    }
329                }
330
331                // Send response to peer
332                self.handle_serve(&mut sender, peer, id, result, self.priority_responses)
333                    .await;
334            },
335            // Handle network messages
336            msg = receiver.recv() => {
337                // Break if the receiver is closed
338                let (peer, msg) = match msg {
339                    Ok(msg) => msg,
340                    Err(err) => {
341                        error!(?err, "receiver closed");
342                        return;
343                    }
344                };
345
346                // Skip if there is a decoding error
347                let msg = match msg {
348                    Ok(msg) => msg,
349                    Err(err) => {
350                        trace!(?err, ?peer, "decode failed");
351                        continue;
352                    }
353                };
354                match msg.payload {
355                    wire::Payload::Request(key) => {
356                        self.handle_network_request(peer, msg.id, key).await
357                    }
358                    wire::Payload::Response(response) => {
359                        self.handle_network_response(peer, msg.id, response).await
360                    }
361                    wire::Payload::Error => self.handle_network_error_response(peer, msg.id).await,
362                };
363            },
364        }
365    }
366
367    /// Handles the case where the application responds to a request from an external peer.
368    async fn handle_serve(
369        &mut self,
370        sender: &mut WrappedSender<NetS, wire::Message<Key>>,
371        peer: P,
372        id: u64,
373        response: Result<Bytes, oneshot::error::RecvError>,
374        priority: bool,
375    ) {
376        // Encode message
377        let payload: wire::Payload<Key> = response.map_or_else(
378            |_| wire::Payload::Error,
379            |data| wire::Payload::Response(data),
380        );
381        let msg = wire::Message { id, payload };
382
383        // Send message to peer
384        let result = sender
385            .send(Recipients::One(peer.clone()), msg, priority)
386            .await;
387
388        // Log result, but do not handle errors
389        match result {
390            Err(err) => error!(?err, ?peer, ?id, "serve send failed"),
391            Ok(to) if to.is_empty() => warn!(?peer, ?id, "serve send failed"),
392            Ok(_) => trace!(?peer, ?id, "serve sent"),
393        };
394    }
395
396    /// Handle a network request from a peer.
397    async fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
398        // Serve the request
399        trace!(?peer, ?id, "peer request");
400        let mut producer = self.producer.clone();
401        let timer = self.metrics.serve_duration.timer();
402        self.serves.push(async move {
403            let receiver = producer.produce(key).await;
404            let result = receiver.await;
405            Serve {
406                timer,
407                peer,
408                id,
409                result,
410            }
411        });
412    }
413
414    /// Handle a network response from a peer.
415    async fn handle_network_response(&mut self, peer: P, id: u64, response: Bytes) {
416        trace!(?peer, ?id, "peer response: data");
417
418        // Get the key associated with the response, if any
419        let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
420            // It's possible that the key does not exist if the request was canceled
421            return;
422        };
423
424        // The peer had the data, so we can deliver it to the consumer
425        if self.consumer.deliver(key.clone(), response).await {
426            // Record metrics
427            self.metrics.fetch.inc(Status::Success);
428            self.fetch_timers.remove(&key).unwrap(); // must exist in the map, records metric on drop
429
430            // Clear all targets for this key
431            self.fetcher.clear_targets(&key);
432            return;
433        }
434
435        // If the data is invalid, we need to block the peer and try again
436        // (blocking the peer also removes any targets associated with it)
437        self.blocker.block(peer.clone()).await;
438        self.fetcher.block(peer);
439        self.metrics.fetch.inc(Status::Failure);
440        self.fetcher.add_retry(key);
441    }
442
443    /// Handle a network response from a peer that did not have the data.
444    async fn handle_network_error_response(&mut self, peer: P, id: u64) {
445        trace!(?peer, ?id, "peer response: error");
446
447        // Get the key associated with the response, if any
448        let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
449            // It's possible that the key does not exist if the request was canceled
450            return;
451        };
452
453        // The peer did not have the data, so we need to try again
454        self.metrics.fetch.inc(Status::Failure);
455        self.fetcher.add_retry(key);
456    }
457}