1#![allow(dead_code)]
2
3use std::sync::{Arc, Weak};
4
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::NetAddress;
9use mm1_common::log;
10use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, mpsc};
11
12#[derive(derive_more::Debug)]
13pub(crate) struct Registry<S, M> {
14 #[debug(skip)]
15 networks: scc::TreeIndex<AddressRange, Node<S, M>>,
16}
17
18pub(crate) struct Node<S, M> {
19 subnet_lease: Arc<Lease>,
20 inbox_size: usize,
21 inbox_semaphore: Arc<Semaphore>,
22 mailbox_tx: SubnetMailboxTx<S, M>,
23}
24
25pub(crate) fn new_mailbox<S, M>() -> (SubnetMailboxTx<S, M>, SubnetMailboxRx<S, M>) {
26 let subnet_notify = Arc::new(Notify::new());
27 let (tx_system, rx_system) = mpsc::unbounded_channel();
28 let (tx_priority, rx_priority) = kanal::unbounded();
29 let (tx_regular, rx_regular) = kanal::unbounded();
30
31 let tx_priority = tx_priority.into();
32 let tx_regular = tx_regular.into();
33
34 let tx = SubnetMailboxTx {
35 tx_system,
36 subnet_notify: subnet_notify.clone(),
37 tx_priority,
38 tx_regular,
39 };
40 let rx = SubnetMailboxRx {
41 rx_system,
42 subnet_notify,
43 rx_priority,
44 rx_regular,
45 };
46
47 (tx, rx)
48}
49
50pub(crate) struct SubnetMailboxTx<S, M> {
51 pub(crate) tx_system: mpsc::UnboundedSender<S>,
52
53 pub(crate) subnet_notify: Arc<Notify>,
54 pub(crate) tx_priority: Arc<kanal::Sender<MessageWithoutPermit<M>>>,
55 pub(crate) tx_regular: Arc<kanal::Sender<MessageWithPermit<M>>>,
56}
57
58pub(crate) struct WeakSubnetMailboxTx<S, M> {
59 pub(crate) tx_system: mpsc::WeakUnboundedSender<S>,
60
61 pub(crate) subnet_notify: Weak<Notify>,
62 pub(crate) tx_priority: Weak<kanal::Sender<MessageWithoutPermit<M>>>,
63 pub(crate) tx_regular: Weak<kanal::Sender<MessageWithPermit<M>>>,
64}
65
66pub(crate) struct SubnetMailboxRx<S, M> {
67 pub(crate) rx_system: mpsc::UnboundedReceiver<S>,
68 pub(crate) subnet_notify: Arc<Notify>,
69 pub(crate) rx_priority: kanal::Receiver<MessageWithoutPermit<M>>,
70 pub(crate) rx_regular: kanal::Receiver<MessageWithPermit<M>>,
71}
72
73pub(crate) struct MessageWithoutPermit<M> {
74 pub(crate) to: Address,
75 pub(crate) message: M,
76}
77
78pub(crate) struct MessageWithPermit<M> {
79 pub(crate) to: Address,
80 pub(crate) message: M,
81 permit: OwnedSemaphorePermit,
82}
83
84impl<S, M> Registry<S, M>
85where
86 S: 'static,
87 M: 'static,
88{
89 pub(crate) fn new() -> Self {
90 Default::default()
91 }
92
93 pub(crate) fn register(
94 &self,
95 subnet_address: NetAddress,
96 node: Node<S, M>,
97 ) -> Result<(), Node<S, M>> {
98 let address_range = AddressRange::from(subnet_address);
99 self.networks
100 .insert(address_range, node)
101 .inspect_err(|(address_range, _node)| {
102 log::warn!("failed to bind address range: {}", address_range)
103 })
104 .map_err(|(_address_range, node)| node)?;
105 log::trace!("register: registered {}", address_range);
106 Ok(())
107 }
108
109 pub(crate) fn unregister(&self, subnet_address: NetAddress) -> bool {
110 let guard = Default::default();
111 let sought_range = AddressRange::from(subnet_address);
112 let Some((found_range, _)) = self.networks.peek_entry(&sought_range, &guard) else {
113 log::trace!(
114 "unregister: sought-range not found [sought-range: {}]",
115 sought_range
116 );
117 return false
118 };
119 if *found_range != sought_range {
120 log::error!(
121 "unregister: sought-range is not equal to the found range [sought: {}; found: {}]",
122 sought_range,
123 found_range
124 );
125 return false
126 }
127 let removed = self.networks.remove(&sought_range);
128 log::trace!("unregister: range {} removed — {}", sought_range, removed);
129
130 removed
131 }
132
133 pub(crate) fn lookup(&self, address: Address) -> Option<Node<S, M>> {
134 self.networks
135 .peek_with(&AddressRange::from(address), |_, node| node.clone())
136 }
137}
138
139impl<S, M> Node<S, M> {
140 pub(crate) fn new(
141 subnet_lease: Lease,
142 inbox_size: usize,
143 mailbox_tx: SubnetMailboxTx<S, M>,
144 ) -> Self {
145 let inbox_semaphore = Arc::new(Semaphore::new(inbox_size));
146 let subnet_lease = Arc::new(subnet_lease);
147 Self {
148 subnet_lease,
149 inbox_size,
150 inbox_semaphore,
151 mailbox_tx,
152 }
153 }
154}
155
156impl<S, M> Node<S, M> {
157 pub(crate) fn send(&self, to: Address, priority: bool, message: M) -> Result<(), ()> {
158 let Self {
159 inbox_semaphore,
160 mailbox_tx,
161 ..
162 } = self;
163 let SubnetMailboxTx {
164 tx_priority,
165 tx_regular,
166 subnet_notify,
167 ..
168 } = mailbox_tx;
169
170 let sent = if priority {
171 let message_without_permit = MessageWithoutPermit { to, message };
172 tx_priority
173 .try_send(message_without_permit)
174 .map_err(|_e| ())?
175 } else {
176 let Ok(permit) = inbox_semaphore.clone().try_acquire_owned() else {
177 return Err(())
178 };
179 let message_with_permit = MessageWithPermit {
180 to,
181 message,
182 permit,
183 };
184 tx_regular.try_send(message_with_permit).map_err(|_e| ())?
185 };
186 if sent {
187 subnet_notify.notify_one();
188 Ok(())
189 } else {
190 Err(())
191 }
192 }
193
194 pub(crate) fn sys_send(&self, sys_msg: S) -> Result<(), S> {
195 let Self { mailbox_tx, .. } = self;
196 let SubnetMailboxTx { tx_system, .. } = mailbox_tx;
197 tx_system.send(sys_msg).map_err(|e| e.0)
198 }
199}
200
201impl<S, M> Clone for Node<S, M> {
202 fn clone(&self) -> Self {
203 let Self {
204 subnet_lease,
205 inbox_size,
206 inbox_semaphore,
207 mailbox_tx,
208 } = self;
209 Self {
210 subnet_lease: subnet_lease.clone(),
211 inbox_size: *inbox_size,
212 inbox_semaphore: inbox_semaphore.clone(),
213 mailbox_tx: mailbox_tx.clone(),
214 }
215 }
216}
217
218impl<S, M> SubnetMailboxTx<S, M> {
219 pub(crate) fn downgrade(&self) -> WeakSubnetMailboxTx<S, M> {
220 let Self {
221 tx_system,
222 subnet_notify,
223 tx_priority,
224 tx_regular,
225 } = self;
226 let tx_system = tx_system.downgrade();
227 let subnet_notify = Arc::downgrade(subnet_notify);
228 let tx_priority = Arc::downgrade(tx_priority);
229 let tx_regular = Arc::downgrade(tx_regular);
230
231 WeakSubnetMailboxTx {
232 tx_system,
233 subnet_notify,
234 tx_priority,
235 tx_regular,
236 }
237 }
238}
239impl<S, M> WeakSubnetMailboxTx<S, M> {
240 pub(crate) fn upgrade(&self) -> Option<SubnetMailboxTx<S, M>> {
241 let Self {
242 tx_system,
243 subnet_notify,
244 tx_priority,
245 tx_regular,
246 } = self;
247 let tx_system = tx_system.upgrade()?;
248 let subnet_notify = subnet_notify.upgrade()?;
249 let tx_priority = tx_priority.upgrade()?;
250 let tx_regular = tx_regular.upgrade()?;
251
252 Some(SubnetMailboxTx {
253 tx_system,
254 subnet_notify,
255 tx_priority,
256 tx_regular,
257 })
258 }
259}
260
261impl<S, M> Clone for SubnetMailboxTx<S, M> {
262 fn clone(&self) -> Self {
263 let Self {
264 tx_system,
265 subnet_notify,
266 tx_priority,
267 tx_regular,
268 } = self;
269 Self {
270 tx_system: tx_system.clone(),
271 subnet_notify: subnet_notify.clone(),
272 tx_priority: tx_priority.clone(),
273 tx_regular: tx_regular.clone(),
274 }
275 }
276}
277impl<S, M> Clone for WeakSubnetMailboxTx<S, M> {
278 fn clone(&self) -> Self {
279 let Self {
280 tx_system,
281 subnet_notify,
282 tx_priority,
283 tx_regular,
284 } = self;
285 Self {
286 tx_system: tx_system.clone(),
287 subnet_notify: subnet_notify.clone(),
288 tx_priority: tx_priority.clone(),
289 tx_regular: tx_regular.clone(),
290 }
291 }
292}
293
294impl<S, M> Default for Registry<S, M> {
295 fn default() -> Self {
296 Self {
297 networks: Default::default(),
298 }
299 }
300}