mm1_node/
registry.rs

1#![allow(dead_code)]
2
3use std::sync::{Arc, Weak};
4
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::NetAddress;
9use mm1_common::errors::chain::StdErrorDisplayChainExt;
10use mm1_common::log;
11use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, mpsc};
12
13#[derive(derive_more::Debug)]
14pub(crate) struct Registry<S, M> {
15    #[debug(skip)]
16    networks: scc::TreeIndex<AddressRange, Node<S, M>>,
17}
18
19pub(crate) struct Node<S, M> {
20    subnet_lease:    Arc<Lease>,
21    inbox_size:      usize,
22    inbox_semaphore: Arc<Semaphore>,
23    mailbox_tx:      SubnetMailboxTx<S, M>,
24}
25
26pub(crate) fn new_mailbox<S, M>() -> (SubnetMailboxTx<S, M>, SubnetMailboxRx<S, M>) {
27    let subnet_notify = Arc::new(Notify::new());
28    let (tx_system, rx_system) = mpsc::unbounded_channel();
29    let (tx_priority, rx_priority) = kanal::unbounded();
30    let (tx_regular, rx_regular) = kanal::unbounded();
31
32    let tx_priority = tx_priority.into();
33    let tx_regular = tx_regular.into();
34
35    let tx = SubnetMailboxTx {
36        tx_system,
37        subnet_notify: subnet_notify.clone(),
38        tx_priority,
39        tx_regular,
40    };
41    let rx = SubnetMailboxRx {
42        rx_system,
43        subnet_notify,
44        rx_priority,
45        rx_regular,
46    };
47
48    (tx, rx)
49}
50
51pub(crate) struct SubnetMailboxTx<S, M> {
52    pub(crate) tx_system: mpsc::UnboundedSender<S>,
53
54    pub(crate) subnet_notify: Arc<Notify>,
55    pub(crate) tx_priority:   Arc<kanal::Sender<MessageWithoutPermit<M>>>,
56    pub(crate) tx_regular:    Arc<kanal::Sender<MessageWithPermit<M>>>,
57}
58
59pub(crate) struct WeakSubnetMailboxTx<S, M> {
60    pub(crate) tx_system: mpsc::WeakUnboundedSender<S>,
61
62    pub(crate) subnet_notify: Weak<Notify>,
63    pub(crate) tx_priority:   Weak<kanal::Sender<MessageWithoutPermit<M>>>,
64    pub(crate) tx_regular:    Weak<kanal::Sender<MessageWithPermit<M>>>,
65}
66
67pub(crate) struct SubnetMailboxRx<S, M> {
68    pub(crate) rx_system:     mpsc::UnboundedReceiver<S>,
69    pub(crate) subnet_notify: Arc<Notify>,
70    pub(crate) rx_priority:   kanal::Receiver<MessageWithoutPermit<M>>,
71    pub(crate) rx_regular:    kanal::Receiver<MessageWithPermit<M>>,
72}
73
74pub(crate) struct MessageWithoutPermit<M> {
75    pub(crate) to:      Address,
76    pub(crate) message: M,
77}
78
79pub(crate) struct MessageWithPermit<M> {
80    pub(crate) to:      Address,
81    pub(crate) message: M,
82    permit:             OwnedSemaphorePermit,
83}
84
85impl<S, M> Registry<S, M>
86where
87    S: 'static,
88    M: 'static,
89{
90    pub(crate) fn new() -> Self {
91        Default::default()
92    }
93
94    pub(crate) fn register(
95        &self,
96        subnet_address: NetAddress,
97        node: Node<S, M>,
98    ) -> Result<(), Node<S, M>> {
99        let address_range = AddressRange::from(subnet_address);
100        self.networks
101            .insert(address_range, node)
102            .inspect_err(
103                |(address_range, _node)| log::warn!(%address_range, "failed to bind address range"),
104            )
105            .map_err(|(_address_range, node)| node)?;
106        log::trace!(%address_range, "register: registered");
107        Ok(())
108    }
109
110    pub(crate) fn unregister(&self, subnet_address: NetAddress) -> bool {
111        let guard = Default::default();
112        let sought_range = AddressRange::from(subnet_address);
113        let Some((found_range, _)) = self.networks.peek_entry(&sought_range, &guard) else {
114            log::trace!(
115                %sought_range,
116                "unregister: sought-range not found"
117            );
118            return false
119        };
120        if *found_range != sought_range {
121            log::error!(
122                %sought_range,
123                %found_range,
124                "unregister: sought-range is not equal to the found range"
125            );
126            return false
127        }
128        let removed = self.networks.remove(&sought_range);
129        log::trace!(%sought_range, %removed, "unregister: removing range");
130
131        removed
132    }
133
134    pub(crate) fn lookup(&self, address: Address) -> Option<Node<S, M>> {
135        self.networks
136            .peek_with(&AddressRange::from(address), |_, node| node.clone())
137    }
138}
139
140impl<S, M> Node<S, M> {
141    pub(crate) fn new(
142        subnet_lease: Lease,
143        inbox_size: usize,
144        mailbox_tx: SubnetMailboxTx<S, M>,
145    ) -> Self {
146        let inbox_semaphore = Arc::new(Semaphore::new(inbox_size));
147        let subnet_lease = Arc::new(subnet_lease);
148        Self {
149            subnet_lease,
150            inbox_size,
151            inbox_semaphore,
152            mailbox_tx,
153        }
154    }
155}
156
157impl<S, M> Node<S, M> {
158    pub(crate) fn send(&self, to: Address, priority: bool, message: M) -> Result<(), ()> {
159        let Self {
160            inbox_semaphore,
161            mailbox_tx,
162            ..
163        } = self;
164        let SubnetMailboxTx {
165            tx_priority,
166            tx_regular,
167            subnet_notify,
168            ..
169        } = mailbox_tx;
170
171        let sent = if priority {
172            let message_without_permit = MessageWithoutPermit { to, message };
173            tx_priority
174                .try_send(message_without_permit)
175                .inspect_err(|e| log::warn!(reason = %e.as_display_chain(), "could not send via tx-priority"))
176                .map_err(|_e| ())?
177        } else {
178            let Ok(permit) = inbox_semaphore.clone().try_acquire_owned() else {
179                return Err(())
180            };
181            let message_with_permit = MessageWithPermit {
182                to,
183                message,
184                permit,
185            };
186            tx_regular
187                .try_send(message_with_permit)
188                .inspect_err(
189                    |e| log::warn!(reason = %e.as_display_chain(), "could not send via tx-regular"),
190                )
191                .map_err(|_e| ())?
192        };
193        if sent {
194            log::trace!(subnet = %self.subnet_lease.net_address(), "notifying subnet");
195            subnet_notify.notify_one();
196            Ok(())
197        } else {
198            Err(())
199        }
200    }
201
202    pub(crate) fn sys_send(&self, sys_msg: S) -> Result<(), S> {
203        let Self { mailbox_tx, .. } = self;
204        let SubnetMailboxTx { tx_system, .. } = mailbox_tx;
205        tx_system.send(sys_msg).map_err(|e| e.0)
206    }
207}
208
209impl<S, M> Clone for Node<S, M> {
210    fn clone(&self) -> Self {
211        let Self {
212            subnet_lease,
213            inbox_size,
214            inbox_semaphore,
215            mailbox_tx,
216        } = self;
217        Self {
218            subnet_lease:    subnet_lease.clone(),
219            inbox_size:      *inbox_size,
220            inbox_semaphore: inbox_semaphore.clone(),
221            mailbox_tx:      mailbox_tx.clone(),
222        }
223    }
224}
225
226impl<S, M> SubnetMailboxTx<S, M> {
227    pub(crate) fn downgrade(&self) -> WeakSubnetMailboxTx<S, M> {
228        let Self {
229            tx_system,
230            subnet_notify,
231            tx_priority,
232            tx_regular,
233        } = self;
234        let tx_system = tx_system.downgrade();
235        let subnet_notify = Arc::downgrade(subnet_notify);
236        let tx_priority = Arc::downgrade(tx_priority);
237        let tx_regular = Arc::downgrade(tx_regular);
238
239        WeakSubnetMailboxTx {
240            tx_system,
241            subnet_notify,
242            tx_priority,
243            tx_regular,
244        }
245    }
246}
247impl<S, M> WeakSubnetMailboxTx<S, M> {
248    pub(crate) fn upgrade(&self) -> Option<SubnetMailboxTx<S, M>> {
249        let Self {
250            tx_system,
251            subnet_notify,
252            tx_priority,
253            tx_regular,
254        } = self;
255        let tx_system = tx_system.upgrade()?;
256        let subnet_notify = subnet_notify.upgrade()?;
257        let tx_priority = tx_priority.upgrade()?;
258        let tx_regular = tx_regular.upgrade()?;
259
260        Some(SubnetMailboxTx {
261            tx_system,
262            subnet_notify,
263            tx_priority,
264            tx_regular,
265        })
266    }
267}
268
269impl<S, M> Clone for SubnetMailboxTx<S, M> {
270    fn clone(&self) -> Self {
271        let Self {
272            tx_system,
273            subnet_notify,
274            tx_priority,
275            tx_regular,
276        } = self;
277        Self {
278            tx_system:     tx_system.clone(),
279            subnet_notify: subnet_notify.clone(),
280            tx_priority:   tx_priority.clone(),
281            tx_regular:    tx_regular.clone(),
282        }
283    }
284}
285impl<S, M> Clone for WeakSubnetMailboxTx<S, M> {
286    fn clone(&self) -> Self {
287        let Self {
288            tx_system,
289            subnet_notify,
290            tx_priority,
291            tx_regular,
292        } = self;
293        Self {
294            tx_system:     tx_system.clone(),
295            subnet_notify: subnet_notify.clone(),
296            tx_priority:   tx_priority.clone(),
297            tx_regular:    tx_regular.clone(),
298        }
299    }
300}
301
302impl<S, M> Default for Registry<S, M> {
303    fn default() -> Self {
304        Self {
305            networks: Default::default(),
306        }
307    }
308}