commonware_resolver/p2p/
engine.rs

1use super::{
2    config::Config,
3    fetcher::Fetcher,
4    ingress::{Mailbox, Message},
5    metrics, wire, Producer,
6};
7use crate::Consumer;
8use bytes::Bytes;
9use commonware_cryptography::PublicKey;
10use commonware_macros::select;
11use commonware_p2p::{
12    utils::codec::{wrap, WrappedSender},
13    Manager, Receiver, Recipients, Sender,
14};
15use commonware_runtime::{
16    spawn_cell,
17    telemetry::metrics::{
18        histogram,
19        status::{CounterExt, Status},
20    },
21    Clock, ContextCell, Handle, Metrics, Spawner,
22};
23use commonware_utils::{futures::Pool as FuturesPool, Span};
24use futures::{
25    channel::{mpsc, oneshot},
26    future::{self, Either},
27    StreamExt,
28};
29use governor::clock::Clock as GClock;
30use rand::Rng;
31use std::{collections::HashMap, marker::PhantomData};
32use tracing::{debug, error, trace, warn};
33
34/// Represents a pending serve operation.
35struct Serve<E: Clock, P: PublicKey> {
36    timer: histogram::Timer<E>,
37    peer: P,
38    id: u64,
39    result: Result<Bytes, oneshot::Canceled>,
40}
41
42/// Manages incoming and outgoing P2P requests, coordinating fetch and serve operations.
43pub struct Engine<
44    E: Clock + GClock + Spawner + Rng + Metrics,
45    P: PublicKey,
46    D: Manager<PublicKey = P>,
47    Key: Span,
48    Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
49    Pro: Producer<Key = Key>,
50    NetS: Sender<PublicKey = P>,
51    NetR: Receiver<PublicKey = P>,
52> {
53    /// Context used to spawn tasks, manage time, etc.
54    context: ContextCell<E>,
55
56    /// Consumes data that is fetched from the network
57    consumer: Con,
58
59    /// Produces data for incoming requests
60    producer: Pro,
61
62    /// Manages the list of peers that can be used to fetch data
63    manager: D,
64
65    /// Used to detect changes in the peer set
66    last_peer_set_id: Option<u64>,
67
68    /// Mailbox that makes and cancels fetch requests
69    mailbox: mpsc::Receiver<Message<Key>>,
70
71    /// Manages outgoing fetch requests
72    fetcher: Fetcher<E, P, Key, NetS>,
73
74    /// Track the start time of fetch operations
75    fetch_timers: HashMap<Key, histogram::Timer<E>>,
76
77    /// Holds futures that resolve once the `Producer` has produced the data.
78    /// Once the future is resolved, the data (or an error) is sent to the peer.
79    /// Has unbounded size; the number of concurrent requests should be limited
80    /// by the `Producer` which may drop requests.
81    serves: FuturesPool<Serve<E, P>>,
82
83    /// Whether responses are sent with priority over other network messages
84    priority_responses: bool,
85
86    /// Metrics for the peer actor
87    metrics: metrics::Metrics<E>,
88
89    /// Phantom data for networking types
90    _s: PhantomData<NetS>,
91    _r: PhantomData<NetR>,
92}
93
94impl<
95        E: Clock + GClock + Spawner + Rng + Metrics,
96        P: PublicKey,
97        D: Manager<PublicKey = P>,
98        Key: Span,
99        Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
100        Pro: Producer<Key = Key>,
101        NetS: Sender<PublicKey = P>,
102        NetR: Receiver<PublicKey = P>,
103    > Engine<E, P, D, Key, Con, Pro, NetS, NetR>
104{
105    /// Creates a new `Actor` with the given configuration.
106    ///
107    /// Returns the actor and a mailbox to send messages to it.
108    pub fn new(context: E, cfg: Config<P, D, Key, Con, Pro>) -> (Self, Mailbox<Key>) {
109        let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
110
111        // TODO(#1833): Metrics should use the post-start context
112        let metrics = metrics::Metrics::init(context.clone());
113        let fetcher = Fetcher::new(
114            context.with_label("fetcher"),
115            cfg.requester_config,
116            cfg.fetch_retry_timeout,
117            cfg.priority_requests,
118        );
119        (
120            Self {
121                context: ContextCell::new(context),
122                consumer: cfg.consumer,
123                producer: cfg.producer,
124                manager: cfg.manager,
125                last_peer_set_id: None,
126                mailbox: receiver,
127                fetcher,
128                serves: FuturesPool::default(),
129                priority_responses: cfg.priority_responses,
130                metrics,
131                fetch_timers: HashMap::new(),
132                _s: PhantomData,
133                _r: PhantomData,
134            },
135            Mailbox::new(sender),
136        )
137    }
138
139    /// Runs the actor until the context is stopped.
140    ///
141    /// The actor will handle:
142    /// - Fetching data from other peers and notifying the `Consumer`
143    /// - Serving data to other peers by requesting it from the `Producer`
144    pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
145        spawn_cell!(self.context, self.run(network).await)
146    }
147
148    /// Inner run loop called by `start`.
149    async fn run(mut self, network: (NetS, NetR)) {
150        let mut shutdown = self.context.stopped();
151        let peer_set_subscription = &mut self.manager.subscribe().await;
152
153        // Wrap channel
154        let (mut sender, mut receiver) = wrap((), network.0, network.1);
155
156        loop {
157            // Update metrics
158            self.metrics
159                .fetch_pending
160                .set(self.fetcher.len_pending() as i64);
161            self.metrics
162                .fetch_active
163                .set(self.fetcher.len_active() as i64);
164            self.metrics
165                .peers_blocked
166                .set(self.fetcher.len_blocked() as i64);
167            self.metrics.serve_processing.set(self.serves.len() as i64);
168
169            // Get retry timeout (if any)
170            let deadline_pending = match self.fetcher.get_pending_deadline() {
171                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
172                None => Either::Right(future::pending()),
173            };
174
175            // Get requester timeout (if any)
176            let deadline_active = match self.fetcher.get_active_deadline() {
177                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
178                None => Either::Right(future::pending()),
179            };
180
181            // Handle shutdown signal
182            select! {
183                _ = &mut shutdown => {
184                    debug!("shutdown");
185                    self.serves.cancel_all();
186                    return;
187                },
188
189                // Handle peer set updates
190                peer_set_update = peer_set_subscription.next() => {
191                    let Some((id, _, all)) = peer_set_update else {
192                        debug!("peer set subscription closed");
193                        return;
194                    };
195
196                    // Instead of directing our requests to exclusively the latest set (which may still be syncing, we
197                    // reconcile with all tracked peers).
198                    if self.last_peer_set_id < Some(id) {
199                        self.last_peer_set_id = Some(id);
200                        self.fetcher.reconcile(all.as_ref());
201                    }
202                },
203
204                // Handle mailbox messages
205                msg = self.mailbox.next() => {
206                    let Some(msg) = msg else {
207                        error!("mailbox closed");
208                        return;
209                    };
210                    match msg {
211                        Message::Fetch { key } => {
212                            trace!(?key, "mailbox: fetch");
213
214                            // Check if the fetch is already in progress
215                            if self.fetch_timers.contains_key(&key) {
216                                trace!(?key, "duplicate fetch");
217                                self.metrics.fetch.inc(Status::Dropped);
218                                continue;
219                            }
220
221                            // Record fetch start time
222                            self.fetch_timers.insert(key.clone(), self.metrics.fetch_duration.timer());
223                            self.fetcher.fetch(&mut sender, key, true).await;
224                        }
225                        Message::Cancel { key } => {
226                            trace!(?key, "mailbox: cancel");
227                            let mut guard = self.metrics.cancel.guard(Status::Dropped);
228                            if self.fetcher.cancel(&key) {
229                                guard.set(Status::Success);
230                                self.fetch_timers.remove(&key).unwrap().cancel(); // must exist, don't record metric
231                                self.consumer.failed(key.clone(), ()).await;
232                            }
233                        }
234                        Message::Retain { predicate } => {
235                            trace!("mailbox: retain");
236                            let before = self.fetcher.len();
237                            self.fetcher.retain(predicate);
238                            let after = self.fetcher.len();
239                            if before == after {
240                                self.metrics.cancel.inc(Status::Dropped);
241                            } else {
242                                self.metrics.cancel.inc_by(Status::Success, before.checked_sub(after).unwrap() as u64);
243                            }
244                        }
245                        Message::Clear => {
246                            trace!("mailbox: clear");
247                            let before = self.fetcher.len();
248                            self.fetcher.clear();
249                            let after = self.fetcher.len();
250                            if before == after {
251                                self.metrics.cancel.inc(Status::Dropped);
252                            } else {
253                                self.metrics.cancel.inc_by(Status::Success, before.checked_sub(after).unwrap() as u64);
254                            }
255                        }
256                    }
257                },
258
259                // Handle completed server requests
260                serve = self.serves.next_completed() => {
261                    let Serve { timer, peer, id, result } = serve;
262
263                    // Metrics and logs
264                    match result {
265                        Ok(_) => {
266                            self.metrics.serve.inc(Status::Success);
267                        }
268                        Err(err) => {
269                            debug!(?err, ?peer, ?id, "serve failed");
270                            timer.cancel();
271                            self.metrics.serve.inc(Status::Failure);
272                        }
273                    }
274
275                    // Send response to peer
276                    self.handle_serve(&mut sender, peer, id, result, self.priority_responses).await;
277                },
278
279                // Handle network messages
280                msg = receiver.recv() => {
281                    // Break if the receiver is closed
282                    let (peer, msg) = match msg {
283                        Ok(msg) => msg,
284                        Err(err) => {
285                            error!(?err, "receiver closed");
286                            return;
287                        }
288                    };
289
290                    // Skip if there is a decoding error
291                    let msg = match msg {
292                        Ok(msg) => msg,
293                        Err(err) => {
294                            trace!(?err, ?peer, "decode failed");
295                            continue;
296                        }
297                    };
298                    match msg.payload {
299                        wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key).await,
300                        wire::Payload::Response(response) => self.handle_network_response(&mut sender, peer, msg.id, response).await,
301                        wire::Payload::ErrorResponse => self.handle_network_error_response(&mut sender, peer, msg.id).await,
302                    };
303                },
304
305                // Handle pending deadline
306                _ = deadline_pending => {
307                    let key = self.fetcher.pop_pending();
308                    debug!(?key, "retrying");
309                    self.metrics.fetch.inc(Status::Failure);
310                    self.fetcher.fetch(&mut sender, key, false).await;
311                },
312
313                // Handle active deadline
314                _ = deadline_active => {
315                    if let Some(key) = self.fetcher.pop_active() {
316                        debug!(?key, "requester timeout");
317                        self.metrics.fetch.inc(Status::Failure);
318                        self.fetcher.fetch(&mut sender, key, false).await;
319                    }
320                },
321            }
322        }
323    }
324
325    /// Handles the case where the application responds to a request from an external peer.
326    async fn handle_serve(
327        &mut self,
328        sender: &mut WrappedSender<NetS, wire::Message<Key>>,
329        peer: P,
330        id: u64,
331        response: Result<Bytes, oneshot::Canceled>,
332        priority: bool,
333    ) {
334        // Encode message
335        let payload: wire::Payload<Key> = match response {
336            Ok(data) => wire::Payload::Response(data),
337            Err(_) => wire::Payload::ErrorResponse,
338        };
339        let msg = wire::Message { id, payload };
340
341        // Send message to peer
342        let result = sender
343            .send(Recipients::One(peer.clone()), msg, priority)
344            .await;
345
346        // Log result, but do not handle errors
347        match result {
348            Err(err) => error!(?err, ?peer, ?id, "serve send failed"),
349            Ok(to) if to.is_empty() => warn!(?peer, ?id, "serve send failed"),
350            Ok(_) => trace!(?peer, ?id, "serve sent"),
351        };
352    }
353
354    /// Handle a network request from a peer.
355    async fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
356        // Serve the request
357        trace!(?peer, ?id, "peer request");
358        let mut producer = self.producer.clone();
359        let timer = self.metrics.serve_duration.timer();
360        self.serves.push(async move {
361            let receiver = producer.produce(key).await;
362            let result = receiver.await;
363            Serve {
364                timer,
365                peer,
366                id,
367                result,
368            }
369        });
370    }
371
372    /// Handle a network response from a peer.
373    async fn handle_network_response(
374        &mut self,
375        sender: &mut WrappedSender<NetS, wire::Message<Key>>,
376        peer: P,
377        id: u64,
378        response: Bytes,
379    ) {
380        trace!(?peer, ?id, "peer response: data");
381
382        // Get the key associated with the response, if any
383        let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
384            // It's possible that the key does not exist if the request was canceled
385            return;
386        };
387
388        // The peer had the data, so we can deliver it to the consumer
389        if self.consumer.deliver(key.clone(), response).await {
390            // Record metrics
391            self.metrics.fetch.inc(Status::Success);
392            self.fetch_timers.remove(&key).unwrap(); // must exist in the map, records metric on drop
393            return;
394        }
395
396        // If the data is invalid, we need to block the peer and try again
397        self.fetcher.block(peer);
398        self.metrics.fetch.inc(Status::Failure);
399        self.fetcher.fetch(sender, key, false).await;
400    }
401
402    /// Handle a network response from a peer that did not have the data.
403    async fn handle_network_error_response(
404        &mut self,
405        sender: &mut WrappedSender<NetS, wire::Message<Key>>,
406        peer: P,
407        id: u64,
408    ) {
409        trace!(?peer, ?id, "peer response: error");
410
411        // Get the key associated with the response, if any
412        let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
413            // It's possible that the key does not exist if the request was canceled
414            return;
415        };
416
417        // The peer did not have the data, so we need to try again
418        self.metrics.fetch.inc(Status::Failure);
419        // Don't reset start time for retries, keep the original
420        self.fetcher.fetch(sender, key, false).await;
421    }
422}