mm1_node/
registry.rs

1#![allow(dead_code)]
2
3use std::sync::{Arc, Weak};
4
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::NetAddress;
9use mm1_common::log;
10use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, mpsc};
11
12#[derive(derive_more::Debug)]
13pub(crate) struct Registry<S, M> {
14    #[debug(skip)]
15    networks: scc::TreeIndex<AddressRange, Node<S, M>>,
16}
17
18pub(crate) struct Node<S, M> {
19    subnet_lease:    Arc<Lease>,
20    inbox_size:      usize,
21    inbox_semaphore: Arc<Semaphore>,
22    mailbox_tx:      SubnetMailboxTx<S, M>,
23}
24
25pub(crate) fn new_mailbox<S, M>() -> (SubnetMailboxTx<S, M>, SubnetMailboxRx<S, M>) {
26    let subnet_notify = Arc::new(Notify::new());
27    let (tx_system, rx_system) = mpsc::unbounded_channel();
28    let (tx_priority, rx_priority) = kanal::unbounded();
29    let (tx_regular, rx_regular) = kanal::unbounded();
30
31    let tx_priority = tx_priority.into();
32    let tx_regular = tx_regular.into();
33
34    let tx = SubnetMailboxTx {
35        tx_system,
36        subnet_notify: subnet_notify.clone(),
37        tx_priority,
38        tx_regular,
39    };
40    let rx = SubnetMailboxRx {
41        rx_system,
42        subnet_notify,
43        rx_priority,
44        rx_regular,
45    };
46
47    (tx, rx)
48}
49
50pub(crate) struct SubnetMailboxTx<S, M> {
51    pub(crate) tx_system: mpsc::UnboundedSender<S>,
52
53    pub(crate) subnet_notify: Arc<Notify>,
54    pub(crate) tx_priority:   Arc<kanal::Sender<MessageWithoutPermit<M>>>,
55    pub(crate) tx_regular:    Arc<kanal::Sender<MessageWithPermit<M>>>,
56}
57
58pub(crate) struct WeakSubnetMailboxTx<S, M> {
59    pub(crate) tx_system: mpsc::WeakUnboundedSender<S>,
60
61    pub(crate) subnet_notify: Weak<Notify>,
62    pub(crate) tx_priority:   Weak<kanal::Sender<MessageWithoutPermit<M>>>,
63    pub(crate) tx_regular:    Weak<kanal::Sender<MessageWithPermit<M>>>,
64}
65
66pub(crate) struct SubnetMailboxRx<S, M> {
67    pub(crate) rx_system:     mpsc::UnboundedReceiver<S>,
68    pub(crate) subnet_notify: Arc<Notify>,
69    pub(crate) rx_priority:   kanal::Receiver<MessageWithoutPermit<M>>,
70    pub(crate) rx_regular:    kanal::Receiver<MessageWithPermit<M>>,
71}
72
73pub(crate) struct MessageWithoutPermit<M> {
74    pub(crate) to:      Address,
75    pub(crate) message: M,
76}
77
78pub(crate) struct MessageWithPermit<M> {
79    pub(crate) to:      Address,
80    pub(crate) message: M,
81    permit:             OwnedSemaphorePermit,
82}
83
84impl<S, M> Registry<S, M>
85where
86    S: 'static,
87    M: 'static,
88{
89    pub(crate) fn new() -> Self {
90        Default::default()
91    }
92
93    pub(crate) fn register(
94        &self,
95        subnet_address: NetAddress,
96        node: Node<S, M>,
97    ) -> Result<(), Node<S, M>> {
98        let address_range = AddressRange::from(subnet_address);
99        self.networks
100            .insert(address_range, node)
101            .inspect_err(|(address_range, _node)| {
102                log::warn!("failed to bind address range: {}", address_range)
103            })
104            .map_err(|(_address_range, node)| node)?;
105        log::trace!("register: registered {}", address_range);
106        Ok(())
107    }
108
109    pub(crate) fn unregister(&self, subnet_address: NetAddress) -> bool {
110        let guard = Default::default();
111        let sought_range = AddressRange::from(subnet_address);
112        let Some((found_range, _)) = self.networks.peek_entry(&sought_range, &guard) else {
113            log::trace!(
114                "unregister: sought-range not found [sought-range: {}]",
115                sought_range
116            );
117            return false
118        };
119        if *found_range != sought_range {
120            log::error!(
121                "unregister: sought-range is not equal to the found range [sought: {}; found: {}]",
122                sought_range,
123                found_range
124            );
125            return false
126        }
127        let removed = self.networks.remove(&sought_range);
128        log::trace!("unregister: range {} removed — {}", sought_range, removed);
129
130        removed
131    }
132
133    pub(crate) fn lookup(&self, address: Address) -> Option<Node<S, M>> {
134        self.networks
135            .peek_with(&AddressRange::from(address), |_, node| node.clone())
136    }
137}
138
139impl<S, M> Node<S, M> {
140    pub(crate) fn new(
141        subnet_lease: Lease,
142        inbox_size: usize,
143        mailbox_tx: SubnetMailboxTx<S, M>,
144    ) -> Self {
145        let inbox_semaphore = Arc::new(Semaphore::new(inbox_size));
146        let subnet_lease = Arc::new(subnet_lease);
147        Self {
148            subnet_lease,
149            inbox_size,
150            inbox_semaphore,
151            mailbox_tx,
152        }
153    }
154}
155
156impl<S, M> Node<S, M> {
157    pub(crate) fn send(&self, to: Address, priority: bool, message: M) -> Result<(), ()> {
158        let Self {
159            inbox_semaphore,
160            mailbox_tx,
161            ..
162        } = self;
163        let SubnetMailboxTx {
164            tx_priority,
165            tx_regular,
166            subnet_notify,
167            ..
168        } = mailbox_tx;
169
170        let sent = if priority {
171            let message_without_permit = MessageWithoutPermit { to, message };
172            tx_priority
173                .try_send(message_without_permit)
174                .map_err(|_e| ())?
175        } else {
176            let Ok(permit) = inbox_semaphore.clone().try_acquire_owned() else {
177                return Err(())
178            };
179            let message_with_permit = MessageWithPermit {
180                to,
181                message,
182                permit,
183            };
184            tx_regular.try_send(message_with_permit).map_err(|_e| ())?
185        };
186        if sent {
187            subnet_notify.notify_one();
188            Ok(())
189        } else {
190            Err(())
191        }
192    }
193
194    pub(crate) fn sys_send(&self, sys_msg: S) -> Result<(), S> {
195        let Self { mailbox_tx, .. } = self;
196        let SubnetMailboxTx { tx_system, .. } = mailbox_tx;
197        tx_system.send(sys_msg).map_err(|e| e.0)
198    }
199}
200
201impl<S, M> Clone for Node<S, M> {
202    fn clone(&self) -> Self {
203        let Self {
204            subnet_lease,
205            inbox_size,
206            inbox_semaphore,
207            mailbox_tx,
208        } = self;
209        Self {
210            subnet_lease:    subnet_lease.clone(),
211            inbox_size:      *inbox_size,
212            inbox_semaphore: inbox_semaphore.clone(),
213            mailbox_tx:      mailbox_tx.clone(),
214        }
215    }
216}
217
218impl<S, M> SubnetMailboxTx<S, M> {
219    pub(crate) fn downgrade(&self) -> WeakSubnetMailboxTx<S, M> {
220        let Self {
221            tx_system,
222            subnet_notify,
223            tx_priority,
224            tx_regular,
225        } = self;
226        let tx_system = tx_system.downgrade();
227        let subnet_notify = Arc::downgrade(subnet_notify);
228        let tx_priority = Arc::downgrade(tx_priority);
229        let tx_regular = Arc::downgrade(tx_regular);
230
231        WeakSubnetMailboxTx {
232            tx_system,
233            subnet_notify,
234            tx_priority,
235            tx_regular,
236        }
237    }
238}
239impl<S, M> WeakSubnetMailboxTx<S, M> {
240    pub(crate) fn upgrade(&self) -> Option<SubnetMailboxTx<S, M>> {
241        let Self {
242            tx_system,
243            subnet_notify,
244            tx_priority,
245            tx_regular,
246        } = self;
247        let tx_system = tx_system.upgrade()?;
248        let subnet_notify = subnet_notify.upgrade()?;
249        let tx_priority = tx_priority.upgrade()?;
250        let tx_regular = tx_regular.upgrade()?;
251
252        Some(SubnetMailboxTx {
253            tx_system,
254            subnet_notify,
255            tx_priority,
256            tx_regular,
257        })
258    }
259}
260
261impl<S, M> Clone for SubnetMailboxTx<S, M> {
262    fn clone(&self) -> Self {
263        let Self {
264            tx_system,
265            subnet_notify,
266            tx_priority,
267            tx_regular,
268        } = self;
269        Self {
270            tx_system:     tx_system.clone(),
271            subnet_notify: subnet_notify.clone(),
272            tx_priority:   tx_priority.clone(),
273            tx_regular:    tx_regular.clone(),
274        }
275    }
276}
277impl<S, M> Clone for WeakSubnetMailboxTx<S, M> {
278    fn clone(&self) -> Self {
279        let Self {
280            tx_system,
281            subnet_notify,
282            tx_priority,
283            tx_regular,
284        } = self;
285        Self {
286            tx_system:     tx_system.clone(),
287            subnet_notify: subnet_notify.clone(),
288            tx_priority:   tx_priority.clone(),
289            tx_regular:    tx_regular.clone(),
290        }
291    }
292}
293
294impl<S, M> Default for Registry<S, M> {
295    fn default() -> Self {
296        Self {
297            networks: Default::default(),
298        }
299    }
300}