1#![allow(dead_code)]
2
3use std::sync::{Arc, Weak};
4
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::NetAddress;
9use mm1_common::errors::chain::StdErrorDisplayChainExt;
10use mm1_common::log;
11use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, mpsc};
12
13#[derive(derive_more::Debug)]
14pub(crate) struct Registry<S, M> {
15 #[debug(skip)]
16 networks: scc::TreeIndex<AddressRange, Node<S, M>>,
17}
18
19pub(crate) struct Node<S, M> {
20 subnet_lease: Arc<Lease>,
21 inbox_size: usize,
22 inbox_semaphore: Arc<Semaphore>,
23 mailbox_tx: SubnetMailboxTx<S, M>,
24}
25
26pub(crate) fn new_mailbox<S, M>() -> (SubnetMailboxTx<S, M>, SubnetMailboxRx<S, M>) {
27 let subnet_notify = Arc::new(Notify::new());
28 let (tx_system, rx_system) = mpsc::unbounded_channel();
29 let (tx_priority, rx_priority) = kanal::unbounded();
30 let (tx_regular, rx_regular) = kanal::unbounded();
31
32 let tx_priority = tx_priority.into();
33 let tx_regular = tx_regular.into();
34
35 let tx = SubnetMailboxTx {
36 tx_system,
37 subnet_notify: subnet_notify.clone(),
38 tx_priority,
39 tx_regular,
40 };
41 let rx = SubnetMailboxRx {
42 rx_system,
43 subnet_notify,
44 rx_priority,
45 rx_regular,
46 };
47
48 (tx, rx)
49}
50
51pub(crate) struct SubnetMailboxTx<S, M> {
52 pub(crate) tx_system: mpsc::UnboundedSender<S>,
53
54 pub(crate) subnet_notify: Arc<Notify>,
55 pub(crate) tx_priority: Arc<kanal::Sender<MessageWithoutPermit<M>>>,
56 pub(crate) tx_regular: Arc<kanal::Sender<MessageWithPermit<M>>>,
57}
58
59pub(crate) struct WeakSubnetMailboxTx<S, M> {
60 pub(crate) tx_system: mpsc::WeakUnboundedSender<S>,
61
62 pub(crate) subnet_notify: Weak<Notify>,
63 pub(crate) tx_priority: Weak<kanal::Sender<MessageWithoutPermit<M>>>,
64 pub(crate) tx_regular: Weak<kanal::Sender<MessageWithPermit<M>>>,
65}
66
67pub(crate) struct SubnetMailboxRx<S, M> {
68 pub(crate) rx_system: mpsc::UnboundedReceiver<S>,
69 pub(crate) subnet_notify: Arc<Notify>,
70 pub(crate) rx_priority: kanal::Receiver<MessageWithoutPermit<M>>,
71 pub(crate) rx_regular: kanal::Receiver<MessageWithPermit<M>>,
72}
73
74pub(crate) struct MessageWithoutPermit<M> {
75 pub(crate) to: Address,
76 pub(crate) message: M,
77}
78
79pub(crate) struct MessageWithPermit<M> {
80 pub(crate) to: Address,
81 pub(crate) message: M,
82 permit: OwnedSemaphorePermit,
83}
84
85impl<S, M> Registry<S, M>
86where
87 S: 'static,
88 M: 'static,
89{
90 pub(crate) fn new() -> Self {
91 Default::default()
92 }
93
94 pub(crate) fn register(
95 &self,
96 subnet_address: NetAddress,
97 node: Node<S, M>,
98 ) -> Result<(), Node<S, M>> {
99 let address_range = AddressRange::from(subnet_address);
100 self.networks
101 .insert(address_range, node)
102 .inspect_err(
103 |(address_range, _node)| log::warn!(%address_range, "failed to bind address range"),
104 )
105 .map_err(|(_address_range, node)| node)?;
106 log::trace!(%address_range, "register: registered");
107 Ok(())
108 }
109
110 pub(crate) fn unregister(&self, subnet_address: NetAddress) -> bool {
111 let guard = Default::default();
112 let sought_range = AddressRange::from(subnet_address);
113 let Some((found_range, _)) = self.networks.peek_entry(&sought_range, &guard) else {
114 log::trace!(
115 %sought_range,
116 "unregister: sought-range not found"
117 );
118 return false
119 };
120 if *found_range != sought_range {
121 log::error!(
122 %sought_range,
123 %found_range,
124 "unregister: sought-range is not equal to the found range"
125 );
126 return false
127 }
128 let removed = self.networks.remove(&sought_range);
129 log::trace!(%sought_range, %removed, "unregister: removing range");
130
131 removed
132 }
133
134 pub(crate) fn lookup(&self, address: Address) -> Option<Node<S, M>> {
135 self.networks
136 .peek_with(&AddressRange::from(address), |_, node| node.clone())
137 }
138}
139
140impl<S, M> Node<S, M> {
141 pub(crate) fn new(
142 subnet_lease: Lease,
143 inbox_size: usize,
144 mailbox_tx: SubnetMailboxTx<S, M>,
145 ) -> Self {
146 let inbox_semaphore = Arc::new(Semaphore::new(inbox_size));
147 let subnet_lease = Arc::new(subnet_lease);
148 Self {
149 subnet_lease,
150 inbox_size,
151 inbox_semaphore,
152 mailbox_tx,
153 }
154 }
155}
156
157impl<S, M> Node<S, M> {
158 pub(crate) fn send(&self, to: Address, priority: bool, message: M) -> Result<(), ()> {
159 let Self {
160 inbox_semaphore,
161 mailbox_tx,
162 ..
163 } = self;
164 let SubnetMailboxTx {
165 tx_priority,
166 tx_regular,
167 subnet_notify,
168 ..
169 } = mailbox_tx;
170
171 let sent = if priority {
172 let message_without_permit = MessageWithoutPermit { to, message };
173 tx_priority
174 .try_send(message_without_permit)
175 .inspect_err(|e| log::warn!(reason = %e.as_display_chain(), "could not send via tx-priority"))
176 .map_err(|_e| ())?
177 } else {
178 let Ok(permit) = inbox_semaphore.clone().try_acquire_owned() else {
179 return Err(())
180 };
181 let message_with_permit = MessageWithPermit {
182 to,
183 message,
184 permit,
185 };
186 tx_regular
187 .try_send(message_with_permit)
188 .inspect_err(
189 |e| log::warn!(reason = %e.as_display_chain(), "could not send via tx-regular"),
190 )
191 .map_err(|_e| ())?
192 };
193 if sent {
194 log::trace!(subnet = %self.subnet_lease.net_address(), "notifying subnet");
195 subnet_notify.notify_one();
196 Ok(())
197 } else {
198 Err(())
199 }
200 }
201
202 pub(crate) fn sys_send(&self, sys_msg: S) -> Result<(), S> {
203 let Self { mailbox_tx, .. } = self;
204 let SubnetMailboxTx { tx_system, .. } = mailbox_tx;
205 tx_system.send(sys_msg).map_err(|e| e.0)
206 }
207}
208
209impl<S, M> Clone for Node<S, M> {
210 fn clone(&self) -> Self {
211 let Self {
212 subnet_lease,
213 inbox_size,
214 inbox_semaphore,
215 mailbox_tx,
216 } = self;
217 Self {
218 subnet_lease: subnet_lease.clone(),
219 inbox_size: *inbox_size,
220 inbox_semaphore: inbox_semaphore.clone(),
221 mailbox_tx: mailbox_tx.clone(),
222 }
223 }
224}
225
226impl<S, M> SubnetMailboxTx<S, M> {
227 pub(crate) fn downgrade(&self) -> WeakSubnetMailboxTx<S, M> {
228 let Self {
229 tx_system,
230 subnet_notify,
231 tx_priority,
232 tx_regular,
233 } = self;
234 let tx_system = tx_system.downgrade();
235 let subnet_notify = Arc::downgrade(subnet_notify);
236 let tx_priority = Arc::downgrade(tx_priority);
237 let tx_regular = Arc::downgrade(tx_regular);
238
239 WeakSubnetMailboxTx {
240 tx_system,
241 subnet_notify,
242 tx_priority,
243 tx_regular,
244 }
245 }
246}
247impl<S, M> WeakSubnetMailboxTx<S, M> {
248 pub(crate) fn upgrade(&self) -> Option<SubnetMailboxTx<S, M>> {
249 let Self {
250 tx_system,
251 subnet_notify,
252 tx_priority,
253 tx_regular,
254 } = self;
255 let tx_system = tx_system.upgrade()?;
256 let subnet_notify = subnet_notify.upgrade()?;
257 let tx_priority = tx_priority.upgrade()?;
258 let tx_regular = tx_regular.upgrade()?;
259
260 Some(SubnetMailboxTx {
261 tx_system,
262 subnet_notify,
263 tx_priority,
264 tx_regular,
265 })
266 }
267}
268
269impl<S, M> Clone for SubnetMailboxTx<S, M> {
270 fn clone(&self) -> Self {
271 let Self {
272 tx_system,
273 subnet_notify,
274 tx_priority,
275 tx_regular,
276 } = self;
277 Self {
278 tx_system: tx_system.clone(),
279 subnet_notify: subnet_notify.clone(),
280 tx_priority: tx_priority.clone(),
281 tx_regular: tx_regular.clone(),
282 }
283 }
284}
285impl<S, M> Clone for WeakSubnetMailboxTx<S, M> {
286 fn clone(&self) -> Self {
287 let Self {
288 tx_system,
289 subnet_notify,
290 tx_priority,
291 tx_regular,
292 } = self;
293 Self {
294 tx_system: tx_system.clone(),
295 subnet_notify: subnet_notify.clone(),
296 tx_priority: tx_priority.clone(),
297 tx_regular: tx_regular.clone(),
298 }
299 }
300}
301
302impl<S, M> Default for Registry<S, M> {
303 fn default() -> Self {
304 Self {
305 networks: Default::default(),
306 }
307 }
308}