Skip to main content

commonware_p2p/authenticated/discovery/actors/spawner/
ingress.rs

1use crate::authenticated::{discovery::actors::tracker::Reservation, Mailbox};
2use commonware_actor::{mailbox::UnreliablePolicy, Feedback, Unreliable};
3use commonware_cryptography::PublicKey;
4use commonware_runtime::{Sink, Stream};
5use commonware_stream::encrypted::{Receiver, Sender};
6use std::collections::VecDeque;
7
8/// Messages that can be processed by the spawner actor.
9pub enum Message<O: Sink, I: Stream, P: PublicKey> {
10    /// Notify the spawner to create a new task for the given peer.
11    Spawn {
12        /// The peer's public key.
13        peer: P,
14        /// The connection to the peer.
15        connection: (Sender<O>, Receiver<I>),
16        /// The reservation for the peer.
17        reservation: Reservation<P>,
18    },
19}
20
21impl<P: PublicKey, O: Sink, I: Stream> UnreliablePolicy for Message<O, I, P> {
22    type Overflow = VecDeque<Self>;
23
24    fn handle(_overflow: &mut Self::Overflow, _message: Self) -> bool {
25        // We drop spawn requests when we are backlogged because it is more likely
26        // than not that by the time we get around to handling it the peer connection
27        // will have already timed out (and closed).
28        false
29    }
30}
31
32impl<P: PublicKey, O: Sink, I: Stream> Mailbox<Message<O, I, P>> {
33    /// Send a message to the actor to spawn a new task for the given peer.
34    ///
35    /// This may be rejected when the spawner is backlogged, or return closed after shutdown, which
36    /// is harmless since stale connections do not need to be spawned.
37    pub fn spawn(
38        &mut self,
39        connection: (Sender<O>, Receiver<I>),
40        reservation: Reservation<P>,
41    ) -> Unreliable<Feedback> {
42        self.0.enqueue(Message::Spawn {
43            peer: reservation.metadata().public_key().clone(),
44            connection,
45            reservation,
46        })
47    }
48}
49
50#[cfg(test)]
51mod tests {
52    use super::*;
53    use crate::authenticated::discovery::actors::tracker::{self, Metadata};
54    use commonware_actor::mailbox;
55    use commonware_cryptography::{
56        ed25519::{PrivateKey, PublicKey},
57        Signer as _,
58    };
59    use commonware_runtime::{deterministic, mocks, Runner as _, Spawner as _, Supervisor as _};
60    use commonware_stream::encrypted::{
61        dial, listen, Config as StreamConfig, Receiver as EncryptedReceiver,
62        Sender as EncryptedSender,
63    };
64    use commonware_utils::NZUsize;
65    use futures::FutureExt as _;
66    use std::time::Duration;
67
68    const STREAM_NAMESPACE: &[u8] = b"test_discovery_spawner_ingress";
69    const MAX_MESSAGE_SIZE: u32 = 64 * 1024;
70
71    type Connection = (
72        EncryptedSender<mocks::Sink>,
73        EncryptedReceiver<mocks::Stream>,
74    );
75
76    fn stream_config(key: PrivateKey) -> StreamConfig<PrivateKey> {
77        StreamConfig {
78            signing_key: key,
79            namespace: STREAM_NAMESPACE.to_vec(),
80            max_message_size: MAX_MESSAGE_SIZE,
81            synchrony_bound: Duration::from_secs(10),
82            max_handshake_age: Duration::from_secs(10),
83            handshake_timeout: Duration::from_secs(10),
84        }
85    }
86
87    async fn connections(
88        context: &deterministic::Context,
89        local_key: PrivateKey,
90        remote_key: PrivateKey,
91    ) -> (Connection, Connection) {
92        let local_pk = local_key.public_key();
93        let remote_pk = remote_key.public_key();
94        let (local_sink, remote_stream) = mocks::Channel::init();
95        let (remote_sink, local_stream) = mocks::Channel::init();
96
97        let listener = context.child("listener").spawn({
98            let expected = local_pk.clone();
99            move |context| async move {
100                listen(
101                    context,
102                    |_| async { true },
103                    stream_config(remote_key),
104                    remote_stream,
105                    remote_sink,
106                )
107                .await
108                .map(|(peer, sender, receiver)| {
109                    assert_eq!(peer, expected);
110                    (sender, receiver)
111                })
112            }
113        });
114
115        let dialer = dial(
116            context.child("dialer"),
117            stream_config(local_key),
118            remote_pk,
119            local_stream,
120            local_sink,
121        )
122        .await
123        .expect("dial failed");
124
125        let listener = listener
126            .await
127            .expect("listener task failed")
128            .expect("listen failed");
129
130        (dialer, listener)
131    }
132
133    #[test]
134    fn spawn_overflow_rejects_message_and_releases_reservation() {
135        deterministic::Runner::default().start(|context| async move {
136            let (connection_1, connection_2) =
137                connections(&context, PrivateKey::from_seed(1), PrivateKey::from_seed(2)).await;
138            let peer_1 = PrivateKey::from_seed(1).public_key();
139            let peer_2 = PrivateKey::from_seed(2).public_key();
140
141            let (mut spawner, mut receiver) =
142                Mailbox::<Message<mocks::Sink, mocks::Stream, PublicKey>>::new(
143                    context.child("spawner_mailbox"),
144                    NZUsize!(1),
145                );
146            let (tracker_sender, mut tracker_receiver) =
147                mailbox::new::<tracker::Message<PublicKey>>(
148                    context.child("tracker_mailbox"),
149                    NZUsize!(10),
150                );
151            let releaser = tracker::ingress::Releaser::new(tracker_sender);
152
153            let reservation_1 =
154                Reservation::new(Metadata::Listener(peer_1.clone()), releaser.clone());
155            let reservation_2 = Reservation::new(Metadata::Listener(peer_2.clone()), releaser);
156
157            assert_eq!(
158                spawner.spawn(connection_1, reservation_1),
159                Unreliable::new(Feedback::Ok)
160            );
161            assert_eq!(
162                spawner.spawn(connection_2, reservation_2),
163                Unreliable::Rejected
164            );
165
166            let release = tracker_receiver
167                .recv()
168                .await
169                .expect("release should be enqueued");
170            let tracker::Message::Release { metadata } = release else {
171                panic!("unexpected tracker message");
172            };
173            assert_eq!(metadata.public_key(), &peer_2);
174
175            let Message::Spawn { peer, .. } = receiver
176                .recv()
177                .await
178                .expect("ready spawn should be retained");
179            assert_eq!(peer, peer_1);
180            assert!(receiver.recv().now_or_never().is_none());
181        });
182    }
183}