commonware_p2p/authenticated/lookup/actors/spawner/
ingress.rs1use crate::authenticated::{lookup::actors::tracker::Reservation, Mailbox};
2use commonware_actor::{mailbox::UnreliablePolicy, Feedback, Unreliable};
3use commonware_cryptography::PublicKey;
4use commonware_runtime::{Sink, Stream};
5use commonware_stream::encrypted::{Receiver, Sender};
6use std::collections::VecDeque;
7
8pub enum Message<Si: Sink, St: Stream, P: PublicKey> {
10 Spawn {
12 peer: P,
14 connection: (Sender<Si>, Receiver<St>),
16 reservation: Reservation<P>,
18 },
19}
20
21impl<Si: Sink, St: Stream, P: PublicKey> UnreliablePolicy for Message<Si, St, P> {
22 type Overflow = VecDeque<Self>;
23
24 fn handle(_overflow: &mut Self::Overflow, _message: Self) -> bool {
25 false
29 }
30}
31
32impl<Si: Sink, St: Stream, P: PublicKey> Mailbox<Message<Si, St, P>> {
33 pub fn spawn(
38 &mut self,
39 connection: (Sender<Si>, Receiver<St>),
40 reservation: Reservation<P>,
41 ) -> Unreliable<Feedback> {
42 self.0.enqueue(Message::Spawn {
43 peer: reservation.metadata().public_key().clone(),
44 connection,
45 reservation,
46 })
47 }
48}
49
50#[cfg(test)]
51mod tests {
52 use super::*;
53 use crate::authenticated::lookup::actors::tracker::{self, Metadata};
54 use commonware_actor::mailbox;
55 use commonware_cryptography::{
56 ed25519::{PrivateKey, PublicKey},
57 Signer as _,
58 };
59 use commonware_runtime::{deterministic, mocks, Runner as _, Spawner as _, Supervisor as _};
60 use commonware_stream::encrypted::{
61 dial, listen, Config as StreamConfig, Receiver as EncryptedReceiver,
62 Sender as EncryptedSender,
63 };
64 use commonware_utils::NZUsize;
65 use futures::FutureExt as _;
66 use std::time::Duration;
67
68 const STREAM_NAMESPACE: &[u8] = b"test_lookup_spawner_ingress";
69 const MAX_MESSAGE_SIZE: u32 = 64 * 1024;
70
71 type Connection = (
72 EncryptedSender<mocks::Sink>,
73 EncryptedReceiver<mocks::Stream>,
74 );
75
76 fn stream_config(key: PrivateKey) -> StreamConfig<PrivateKey> {
77 StreamConfig {
78 signing_key: key,
79 namespace: STREAM_NAMESPACE.to_vec(),
80 max_message_size: MAX_MESSAGE_SIZE,
81 synchrony_bound: Duration::from_secs(10),
82 max_handshake_age: Duration::from_secs(10),
83 handshake_timeout: Duration::from_secs(10),
84 }
85 }
86
87 async fn connections(
88 context: &deterministic::Context,
89 local_key: PrivateKey,
90 remote_key: PrivateKey,
91 ) -> (Connection, Connection) {
92 let local_pk = local_key.public_key();
93 let remote_pk = remote_key.public_key();
94 let (local_sink, remote_stream) = mocks::Channel::init();
95 let (remote_sink, local_stream) = mocks::Channel::init();
96
97 let listener = context.child("listener").spawn({
98 let expected = local_pk.clone();
99 move |context| async move {
100 listen(
101 context,
102 |_| async { true },
103 stream_config(remote_key),
104 remote_stream,
105 remote_sink,
106 )
107 .await
108 .map(|(peer, sender, receiver)| {
109 assert_eq!(peer, expected);
110 (sender, receiver)
111 })
112 }
113 });
114
115 let dialer = dial(
116 context.child("dialer"),
117 stream_config(local_key),
118 remote_pk,
119 local_stream,
120 local_sink,
121 )
122 .await
123 .expect("dial failed");
124
125 let listener = listener
126 .await
127 .expect("listener task failed")
128 .expect("listen failed");
129
130 (dialer, listener)
131 }
132
133 #[test]
134 fn spawn_overflow_rejects_message_and_releases_reservation() {
135 deterministic::Runner::default().start(|context| async move {
136 let (connection_1, connection_2) =
137 connections(&context, PrivateKey::from_seed(1), PrivateKey::from_seed(2)).await;
138 let peer_1 = PrivateKey::from_seed(1).public_key();
139 let peer_2 = PrivateKey::from_seed(2).public_key();
140
141 let (mut spawner, mut receiver) =
142 Mailbox::<Message<mocks::Sink, mocks::Stream, PublicKey>>::new(
143 context.child("spawner_mailbox"),
144 NZUsize!(1),
145 );
146 let (tracker_sender, mut tracker_receiver) =
147 mailbox::new::<tracker::Message<PublicKey>>(
148 context.child("tracker_mailbox"),
149 NZUsize!(10),
150 );
151 let releaser = tracker::ingress::Releaser::new(tracker_sender);
152
153 let reservation_1 =
154 Reservation::new(Metadata::Listener(peer_1.clone()), releaser.clone());
155 let reservation_2 = Reservation::new(Metadata::Listener(peer_2.clone()), releaser);
156
157 assert_eq!(
158 spawner.spawn(connection_1, reservation_1),
159 Unreliable::new(Feedback::Ok)
160 );
161 assert_eq!(
162 spawner.spawn(connection_2, reservation_2),
163 Unreliable::Rejected
164 );
165
166 let release = tracker_receiver
167 .recv()
168 .await
169 .expect("release should be enqueued");
170 let tracker::Message::Release { metadata } = release else {
171 panic!("unexpected tracker message");
172 };
173 assert_eq!(metadata.public_key(), &peer_2);
174
175 let Message::Spawn { peer, .. } = receiver
176 .recv()
177 .await
178 .expect("ready spawn should be retained");
179 assert_eq!(peer, peer_1);
180 assert!(receiver.recv().now_or_never().is_none());
181 });
182 }
183}