commonware_resolver/p2p/
engine.rs

1use super::{
2    config::Config,
3    fetcher::{Config as FetcherConfig, Fetcher},
4    ingress::{FetchRequest, Mailbox, Message},
5    metrics, wire, Producer,
6};
7use crate::Consumer;
8use bytes::Bytes;
9use commonware_cryptography::PublicKey;
10use commonware_macros::select;
11use commonware_p2p::{
12    utils::codec::{wrap, WrappedSender},
13    Blocker, Manager, Receiver, Recipients, Sender,
14};
15use commonware_runtime::{
16    spawn_cell,
17    telemetry::metrics::{
18        histogram,
19        status::{CounterExt, GaugeExt, Status},
20    },
21    Clock, ContextCell, Handle, Metrics, Spawner,
22};
23use commonware_utils::{futures::Pool as FuturesPool, Span};
24use futures::{
25    channel::{mpsc, oneshot},
26    future::{self, Either},
27    StreamExt,
28};
29use rand::Rng;
30use std::{collections::HashMap, marker::PhantomData};
31use tracing::{debug, error, trace, warn};
32
33/// Represents a pending serve operation.
34struct Serve<E: Clock, P: PublicKey> {
35    timer: histogram::Timer<E>,
36    peer: P,
37    id: u64,
38    result: Result<Bytes, oneshot::Canceled>,
39}
40
41/// Manages incoming and outgoing P2P requests, coordinating fetch and serve operations.
42pub struct Engine<
43    E: Clock + Spawner + Rng + Metrics,
44    P: PublicKey,
45    D: Manager<PublicKey = P>,
46    B: Blocker<PublicKey = P>,
47    Key: Span,
48    Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
49    Pro: Producer<Key = Key>,
50    NetS: Sender<PublicKey = P>,
51    NetR: Receiver<PublicKey = P>,
52> {
53    /// Context used to spawn tasks, manage time, etc.
54    context: ContextCell<E>,
55
56    /// Consumes data that is fetched from the network
57    consumer: Con,
58
59    /// Produces data for incoming requests
60    producer: Pro,
61
62    /// Manages the list of peers that can be used to fetch data
63    manager: D,
64
65    /// The blocker that will be used to block peers that send invalid responses
66    blocker: B,
67
68    /// Used to detect changes in the peer set
69    last_peer_set_id: Option<u64>,
70
71    /// Mailbox that makes and cancels fetch requests
72    mailbox: mpsc::Receiver<Message<Key, P>>,
73
74    /// Manages outgoing fetch requests
75    fetcher: Fetcher<E, P, Key, NetS>,
76
77    /// Track the start time of fetch operations
78    fetch_timers: HashMap<Key, histogram::Timer<E>>,
79
80    /// Holds futures that resolve once the `Producer` has produced the data.
81    /// Once the future is resolved, the data (or an error) is sent to the peer.
82    /// Has unbounded size; the number of concurrent requests should be limited
83    /// by the `Producer` which may drop requests.
84    serves: FuturesPool<Serve<E, P>>,
85
86    /// Whether responses are sent with priority over other network messages
87    priority_responses: bool,
88
89    /// Metrics for the peer actor
90    metrics: metrics::Metrics<E>,
91
92    /// Phantom data for networking types
93    _s: PhantomData<NetS>,
94    _r: PhantomData<NetR>,
95}
96
97impl<
98        E: Clock + Spawner + Rng + Metrics,
99        P: PublicKey,
100        D: Manager<PublicKey = P>,
101        B: Blocker<PublicKey = P>,
102        Key: Span,
103        Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
104        Pro: Producer<Key = Key>,
105        NetS: Sender<PublicKey = P>,
106        NetR: Receiver<PublicKey = P>,
107    > Engine<E, P, D, B, Key, Con, Pro, NetS, NetR>
108{
109    /// Creates a new `Actor` with the given configuration.
110    ///
111    /// Returns the actor and a mailbox to send messages to it.
112    pub fn new(context: E, cfg: Config<P, D, B, Key, Con, Pro>) -> (Self, Mailbox<Key, P>) {
113        let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
114
115        // TODO(#1833): Metrics should use the post-start context
116        let metrics = metrics::Metrics::init(context.clone());
117        let fetcher = Fetcher::new(
118            context.with_label("fetcher"),
119            FetcherConfig {
120                me: cfg.me,
121                initial: cfg.initial,
122                timeout: cfg.timeout,
123                retry_timeout: cfg.fetch_retry_timeout,
124                priority_requests: cfg.priority_requests,
125            },
126        );
127        (
128            Self {
129                context: ContextCell::new(context),
130                consumer: cfg.consumer,
131                producer: cfg.producer,
132                manager: cfg.manager,
133                blocker: cfg.blocker,
134                last_peer_set_id: None,
135                mailbox: receiver,
136                fetcher,
137                serves: FuturesPool::default(),
138                priority_responses: cfg.priority_responses,
139                metrics,
140                fetch_timers: HashMap::new(),
141                _s: PhantomData,
142                _r: PhantomData,
143            },
144            Mailbox::new(sender),
145        )
146    }
147
148    /// Runs the actor until the context is stopped.
149    ///
150    /// The actor will handle:
151    /// - Fetching data from other peers and notifying the `Consumer`
152    /// - Serving data to other peers by requesting it from the `Producer`
153    pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
154        spawn_cell!(self.context, self.run(network).await)
155    }
156
157    /// Inner run loop called by `start`.
158    async fn run(mut self, network: (NetS, NetR)) {
159        let mut shutdown = self.context.stopped();
160        let peer_set_subscription = &mut self.manager.subscribe().await;
161
162        // Wrap channel
163        let (mut sender, mut receiver) = wrap((), network.0, network.1);
164
165        loop {
166            // Update metrics
167            let _ = self
168                .metrics
169                .fetch_pending
170                .try_set(self.fetcher.len_pending());
171            let _ = self.metrics.fetch_active.try_set(self.fetcher.len_active());
172            let _ = self
173                .metrics
174                .peers_blocked
175                .try_set(self.fetcher.len_blocked());
176            let _ = self.metrics.serve_processing.try_set(self.serves.len());
177
178            // Get retry timeout (if any)
179            let deadline_pending = match self.fetcher.get_pending_deadline() {
180                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
181                None => Either::Right(future::pending()),
182            };
183
184            // Get requester timeout (if any)
185            let deadline_active = match self.fetcher.get_active_deadline() {
186                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
187                None => Either::Right(future::pending()),
188            };
189
190            // Handle shutdown signal
191            select! {
192                _ = &mut shutdown => {
193                    debug!("shutdown");
194                    self.serves.cancel_all();
195                    return;
196                },
197
198                // Handle peer set updates
199                peer_set_update = peer_set_subscription.next() => {
200                    let Some((id, _, all)) = peer_set_update else {
201                        debug!("peer set subscription closed");
202                        return;
203                    };
204
205                    // Instead of directing our requests to exclusively the latest set (which may still be syncing, we
206                    // reconcile with all tracked peers).
207                    if self.last_peer_set_id < Some(id) {
208                        self.last_peer_set_id = Some(id);
209                        self.fetcher.reconcile(all.as_ref());
210                    }
211                },
212
213                // Handle active deadline
214                _ = deadline_active => {
215                    if let Some(key) = self.fetcher.pop_active() {
216                        debug!(?key, "requester timeout");
217                        self.metrics.fetch.inc(Status::Failure);
218                        self.fetcher.add_retry(key);
219                    }
220                },
221
222                // Handle pending deadline
223                _ = deadline_pending => {
224                    self.fetcher.fetch(&mut sender).await;
225                },
226
227                // Handle mailbox messages
228                msg = self.mailbox.next() => {
229                    let Some(msg) = msg else {
230                        error!("mailbox closed");
231                        return;
232                    };
233                    match msg {
234                        Message::Fetch(requests) => {
235                            for FetchRequest { key, targets } in requests {
236                                trace!(?key, "mailbox: fetch");
237
238                                // Check if the fetch is already in progress
239                                let is_new = !self.fetch_timers.contains_key(&key);
240
241                                // Update targets
242                                match targets {
243                                    Some(targets) => {
244                                        // Only add targets if this is a new fetch OR the existing
245                                        // fetch already has targets. Don't restrict an "all" fetch
246                                        // (no targets) to specific targets.
247                                        if is_new || self.fetcher.has_targets(&key) {
248                                            self.fetcher.add_targets(key.clone(), targets);
249                                        }
250                                    }
251                                    None => self.fetcher.clear_targets(&key),
252                                }
253
254                                // Only start new fetch if not already in progress
255                                if is_new {
256                                    self.fetch_timers.insert(key.clone(), self.metrics.fetch_duration.timer());
257                                    self.fetcher.add_ready(key);
258                                } else {
259                                    trace!(?key, "updated targets for existing fetch");
260                                }
261                            }
262                        }
263                        Message::Cancel { key } => {
264                            trace!(?key, "mailbox: cancel");
265                            let mut guard = self.metrics.cancel.guard(Status::Dropped);
266                            if self.fetcher.cancel(&key) {
267                                guard.set(Status::Success);
268                                self.fetch_timers.remove(&key).unwrap().cancel(); // must exist, don't record metric
269                                self.consumer.failed(key.clone(), ()).await;
270                            }
271                        }
272                        Message::Retain { predicate } => {
273                            trace!("mailbox: retain");
274
275                            // Remove from fetcher
276                            self.fetcher.retain(&predicate);
277
278                            // Clean up timers and notify consumer
279                            let before = self.fetch_timers.len();
280                            let removed = self.fetch_timers.extract_if(|k, _| !predicate(k)).collect::<Vec<_>>();
281                            for (key, timer) in removed {
282                                timer.cancel();
283                                self.consumer.failed(key, ()).await;
284                            }
285
286                            // Metrics
287                            let removed = (before - self.fetch_timers.len()) as u64;
288                            if removed == 0 {
289                                self.metrics.cancel.inc(Status::Dropped);
290                            } else {
291                                self.metrics.cancel.inc_by(Status::Success, removed);
292                            }
293                        }
294                        Message::Clear => {
295                            trace!("mailbox: clear");
296
297                            // Clear fetcher
298                            self.fetcher.clear();
299
300                            // Drain timers and notify consumer
301                            let removed = self.fetch_timers.len() as u64;
302                            for (key, timer) in self.fetch_timers.drain() {
303                                timer.cancel();
304                                self.consumer.failed(key, ()).await;
305                            }
306
307                            // Metrics
308                            if removed == 0 {
309                                self.metrics.cancel.inc(Status::Dropped);
310                            } else {
311                                self.metrics.cancel.inc_by(Status::Success, removed);
312                            }
313                        }
314                    }
315                    assert_eq!(self.fetcher.len(), self.fetch_timers.len());
316                },
317
318                // Handle completed server requests
319                serve = self.serves.next_completed() => {
320                    let Serve { timer, peer, id, result } = serve;
321
322                    // Metrics and logs
323                    match result {
324                        Ok(_) => {
325                            self.metrics.serve.inc(Status::Success);
326                        }
327                        Err(err) => {
328                            debug!(?err, ?peer, ?id, "serve failed");
329                            timer.cancel();
330                            self.metrics.serve.inc(Status::Failure);
331                        }
332                    }
333
334                    // Send response to peer
335                    self.handle_serve(&mut sender, peer, id, result, self.priority_responses).await;
336                },
337
338                // Handle network messages
339                msg = receiver.recv() => {
340                    // Break if the receiver is closed
341                    let (peer, msg) = match msg {
342                        Ok(msg) => msg,
343                        Err(err) => {
344                            error!(?err, "receiver closed");
345                            return;
346                        }
347                    };
348
349                    // Skip if there is a decoding error
350                    let msg = match msg {
351                        Ok(msg) => msg,
352                        Err(err) => {
353                            trace!(?err, ?peer, "decode failed");
354                            continue;
355                        }
356                    };
357                    match msg.payload {
358                        wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key).await,
359                        wire::Payload::Response(response) => self.handle_network_response(peer, msg.id, response).await,
360                        wire::Payload::Error => self.handle_network_error_response(peer, msg.id).await,
361                    };
362                },
363            }
364        }
365    }
366
367    /// Handles the case where the application responds to a request from an external peer.
368    async fn handle_serve(
369        &mut self,
370        sender: &mut WrappedSender<NetS, wire::Message<Key>>,
371        peer: P,
372        id: u64,
373        response: Result<Bytes, oneshot::Canceled>,
374        priority: bool,
375    ) {
376        // Encode message
377        let payload: wire::Payload<Key> = response.map_or_else(
378            |_| wire::Payload::Error,
379            |data| wire::Payload::Response(data),
380        );
381        let msg = wire::Message { id, payload };
382
383        // Send message to peer
384        let result = sender
385            .send(Recipients::One(peer.clone()), msg, priority)
386            .await;
387
388        // Log result, but do not handle errors
389        match result {
390            Err(err) => error!(?err, ?peer, ?id, "serve send failed"),
391            Ok(to) if to.is_empty() => warn!(?peer, ?id, "serve send failed"),
392            Ok(_) => trace!(?peer, ?id, "serve sent"),
393        };
394    }
395
396    /// Handle a network request from a peer.
397    async fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
398        // Serve the request
399        trace!(?peer, ?id, "peer request");
400        let mut producer = self.producer.clone();
401        let timer = self.metrics.serve_duration.timer();
402        self.serves.push(async move {
403            let receiver = producer.produce(key).await;
404            let result = receiver.await;
405            Serve {
406                timer,
407                peer,
408                id,
409                result,
410            }
411        });
412    }
413
414    /// Handle a network response from a peer.
415    async fn handle_network_response(&mut self, peer: P, id: u64, response: Bytes) {
416        trace!(?peer, ?id, "peer response: data");
417
418        // Get the key associated with the response, if any
419        let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
420            // It's possible that the key does not exist if the request was canceled
421            return;
422        };
423
424        // The peer had the data, so we can deliver it to the consumer
425        if self.consumer.deliver(key.clone(), response).await {
426            // Record metrics
427            self.metrics.fetch.inc(Status::Success);
428            self.fetch_timers.remove(&key).unwrap(); // must exist in the map, records metric on drop
429
430            // Clear all targets for this key
431            self.fetcher.clear_targets(&key);
432            return;
433        }
434
435        // If the data is invalid, we need to block the peer and try again
436        // (blocking the peer also removes any targets associated with it)
437        self.blocker.block(peer.clone()).await;
438        self.fetcher.block(peer);
439        self.metrics.fetch.inc(Status::Failure);
440        self.fetcher.add_retry(key);
441    }
442
443    /// Handle a network response from a peer that did not have the data.
444    async fn handle_network_error_response(&mut self, peer: P, id: u64) {
445        trace!(?peer, ?id, "peer response: error");
446
447        // Get the key associated with the response, if any
448        let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
449            // It's possible that the key does not exist if the request was canceled
450            return;
451        };
452
453        // The peer did not have the data, so we need to try again
454        self.metrics.fetch.inc(Status::Failure);
455        self.fetcher.add_retry(key);
456    }
457}