mm1_node/
registry.rs

1#![allow(dead_code)]
2
3use std::sync::Arc;
4
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::{Lease, LeaseError, Pool};
8use mm1_address::subnet::{NetAddress, NetMask};
9use mm1_common::log;
10use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc};
11
12#[derive(derive_more::Debug)]
13pub(crate) struct Registry<S, M> {
14    #[debug(skip)]
15    networks: scc::TreeIndex<AddressRange, Node<S, M>>,
16}
17
18pub(crate) struct ActorNode<S, M> {
19    subnet_lease:         Lease,
20    subnet_pool:          Pool,
21    sem_regular_messages: Arc<Semaphore>,
22    forks:                scc::HashMap<Address, ForkEntry<M>>,
23
24    pub tx_system: mpsc::UnboundedSender<S>,
25}
26
27pub(crate) struct NetworkNode<S, M> {
28    subnet_lease:         Lease,
29    sem_regular_messages: Arc<Semaphore>,
30    tx_system:            mpsc::UnboundedSender<S>,
31    tx_priority:          mpsc::UnboundedSender<M>,
32    tx_regular:           mpsc::UnboundedSender<MessageWithPermit<M>>,
33}
34
35pub(crate) struct MessageWithPermit<M> {
36    pub(crate) message: M,
37    permit:             OwnedSemaphorePermit,
38}
39
40#[derive(derive_more::From)]
41pub(crate) enum Node<S, M> {
42    Actor(Arc<ActorNode<S, M>>),
43    Network(Arc<NetworkNode<S, M>>),
44}
45
46pub(crate) struct ForkEntry<M> {
47    fork_lease:  Lease,
48    tx_priority: mpsc::UnboundedSender<M>,
49    tx_regular:  mpsc::UnboundedSender<MessageWithPermit<M>>,
50}
51
52impl<S, M> Registry<S, M>
53where
54    S: 'static,
55    M: 'static,
56{
57    pub(crate) fn new() -> Self {
58        Default::default()
59    }
60
61    pub(crate) fn register(&self, subnet_address: NetAddress, node: impl Into<Node<S, M>>) -> bool {
62        let address_range = AddressRange::from(subnet_address);
63        self.networks
64            .insert(address_range, node.into())
65            .inspect_err(|(address_range, _)| {
66                log::warn!("failed to bind address range: {}", address_range)
67            })
68            .is_ok()
69    }
70
71    pub(crate) fn unregister(&self, subnet_address: NetAddress) -> bool {
72        let guard = Default::default();
73        let sought_range = AddressRange::from(subnet_address);
74        let Some((found_range, _)) = self.networks.peek_entry(&sought_range, &guard) else {
75            return false
76        };
77        if *found_range != sought_range {
78            return false
79        }
80        self.networks.remove(&sought_range)
81    }
82
83    pub(crate) fn lookup(&self, address: Address) -> Option<Node<S, M>> {
84        self.networks
85            .peek_with(&AddressRange::from(address), |_, node| node.clone())
86    }
87}
88
89impl<S, M> ActorNode<S, M> {
90    pub(crate) fn new(
91        subnet_lease: Lease,
92        inbox_size: usize,
93        tx_system: mpsc::UnboundedSender<S>,
94    ) -> Self {
95        let subnet_pool = Pool::new(subnet_lease.net_address());
96        let sem_regular_messages = Arc::new(Semaphore::new(inbox_size));
97        Self {
98            subnet_lease,
99            subnet_pool,
100            sem_regular_messages,
101            tx_system,
102            forks: Default::default(),
103        }
104    }
105
106    pub(crate) fn lease_address(&self) -> Result<Lease, LeaseError> {
107        self.subnet_pool.lease(NetMask::M_64)
108    }
109
110    pub(crate) fn register(&self, address: Address, fork_entry: ForkEntry<M>) -> bool {
111        self.forks.insert(address, fork_entry).is_ok()
112    }
113
114    pub(crate) fn unregister(&self, address: Address) -> Option<Lease> {
115        let (_, ForkEntry { fork_lease, .. }) = self.forks.remove(&address)?;
116        Some(fork_lease)
117    }
118}
119
120impl<M> ForkEntry<M> {
121    pub(crate) fn new(
122        fork_lease: Lease,
123        tx_priority: mpsc::UnboundedSender<M>,
124        tx_regular: mpsc::UnboundedSender<MessageWithPermit<M>>,
125    ) -> Self {
126        assert_eq!(fork_lease.net_address().mask, NetMask::M_64);
127        Self {
128            fork_lease,
129            tx_priority,
130            tx_regular,
131        }
132    }
133}
134
135impl<S, M> NetworkNode<S, M> {
136    pub(crate) fn new(
137        subnet_lease: Lease,
138        inbox_size: usize,
139        tx_system: mpsc::UnboundedSender<S>,
140        tx_priority: mpsc::UnboundedSender<M>,
141        tx_regular: mpsc::UnboundedSender<MessageWithPermit<M>>,
142    ) -> Self {
143        let sem_regular_messages = Arc::new(Semaphore::new(inbox_size));
144        Self {
145            subnet_lease,
146            sem_regular_messages,
147            tx_system,
148            tx_priority,
149            tx_regular,
150        }
151    }
152}
153
154impl<S, M> Node<S, M> {
155    pub(crate) fn send(&self, to: Address, priority: bool, message: M) -> Result<(), M> {
156        let (sem, chans) = match self {
157            Self::Actor(actor) => {
158                (
159                    actor.sem_regular_messages.clone(),
160                    actor
161                        .forks
162                        .get(&to)
163                        .map(|f| (f.tx_priority.clone(), f.tx_regular.clone())),
164                )
165            },
166            Self::Network(network) => {
167                (
168                    network.sem_regular_messages.clone(),
169                    Some((network.tx_priority.clone(), network.tx_regular.clone())),
170                )
171            },
172        };
173        let Some((tx_priority, tx_regular)) = chans else {
174            return Err(message)
175        };
176        if priority {
177            tx_priority.send(message).map_err(|e| e.0)?;
178        } else {
179            let Ok(permit) = sem.try_acquire_owned() else {
180                return Err(message)
181            };
182            let message_with_permit = MessageWithPermit { message, permit };
183            tx_regular
184                .send(message_with_permit)
185                .map_err(|e| e.0.message)?;
186        }
187        Ok(())
188    }
189
190    pub(crate) fn sys_send(&self, sys_msg: S) -> Result<(), S> {
191        let tx_system = match self {
192            Self::Actor(a) => &a.tx_system,
193            Self::Network(n) => &n.tx_system,
194        };
195        tx_system.send(sys_msg).map_err(|e| e.0)
196    }
197}
198
199impl<S, M> Clone for Node<S, M> {
200    fn clone(&self) -> Self {
201        match self {
202            Self::Actor(a) => Self::Actor(a.clone()),
203            Self::Network(n) => Self::Network(n.clone()),
204        }
205    }
206}
207
208impl<S, M> Default for Registry<S, M> {
209    fn default() -> Self {
210        Self {
211            networks: Default::default(),
212        }
213    }
214}