1#![allow(dead_code)]
2
3use std::sync::Arc;
4
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::{Lease, LeaseError, Pool};
8use mm1_address::subnet::{NetAddress, NetMask};
9use mm1_common::log;
10use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc};
11
12#[derive(derive_more::Debug)]
13pub(crate) struct Registry<S, M> {
14 #[debug(skip)]
15 networks: scc::TreeIndex<AddressRange, Node<S, M>>,
16}
17
18pub(crate) struct ActorNode<S, M> {
19 subnet_lease: Lease,
20 subnet_pool: Pool,
21 sem_regular_messages: Arc<Semaphore>,
22 forks: scc::HashMap<Address, ForkEntry<M>>,
23
24 pub tx_system: mpsc::UnboundedSender<S>,
25}
26
27pub(crate) struct NetworkNode<S, M> {
28 subnet_lease: Lease,
29 sem_regular_messages: Arc<Semaphore>,
30 tx_system: mpsc::UnboundedSender<S>,
31 tx_priority: mpsc::UnboundedSender<M>,
32 tx_regular: mpsc::UnboundedSender<MessageWithPermit<M>>,
33}
34
35pub(crate) struct MessageWithPermit<M> {
36 pub(crate) message: M,
37 permit: OwnedSemaphorePermit,
38}
39
40#[derive(derive_more::From)]
41pub(crate) enum Node<S, M> {
42 Actor(Arc<ActorNode<S, M>>),
43 Network(Arc<NetworkNode<S, M>>),
44}
45
46pub(crate) struct ForkEntry<M> {
47 fork_lease: Lease,
48 tx_priority: mpsc::UnboundedSender<M>,
49 tx_regular: mpsc::UnboundedSender<MessageWithPermit<M>>,
50}
51
52impl<S, M> Registry<S, M>
53where
54 S: 'static,
55 M: 'static,
56{
57 pub(crate) fn new() -> Self {
58 Default::default()
59 }
60
61 pub(crate) fn register(&self, subnet_address: NetAddress, node: impl Into<Node<S, M>>) -> bool {
62 let address_range = AddressRange::from(subnet_address);
63 self.networks
64 .insert(address_range, node.into())
65 .inspect_err(|(address_range, _)| {
66 log::warn!("failed to bind address range: {}", address_range)
67 })
68 .is_ok()
69 }
70
71 pub(crate) fn unregister(&self, subnet_address: NetAddress) -> bool {
72 let guard = Default::default();
73 let sought_range = AddressRange::from(subnet_address);
74 let Some((found_range, _)) = self.networks.peek_entry(&sought_range, &guard) else {
75 return false
76 };
77 if *found_range != sought_range {
78 return false
79 }
80 self.networks.remove(&sought_range)
81 }
82
83 pub(crate) fn lookup(&self, address: Address) -> Option<Node<S, M>> {
84 self.networks
85 .peek_with(&AddressRange::from(address), |_, node| node.clone())
86 }
87}
88
89impl<S, M> ActorNode<S, M> {
90 pub(crate) fn new(
91 subnet_lease: Lease,
92 inbox_size: usize,
93 tx_system: mpsc::UnboundedSender<S>,
94 ) -> Self {
95 let subnet_pool = Pool::new(subnet_lease.net_address());
96 let sem_regular_messages = Arc::new(Semaphore::new(inbox_size));
97 Self {
98 subnet_lease,
99 subnet_pool,
100 sem_regular_messages,
101 tx_system,
102 forks: Default::default(),
103 }
104 }
105
106 pub(crate) fn lease_address(&self) -> Result<Lease, LeaseError> {
107 self.subnet_pool.lease(NetMask::M_64)
108 }
109
110 pub(crate) fn register(&self, address: Address, fork_entry: ForkEntry<M>) -> bool {
111 self.forks.insert(address, fork_entry).is_ok()
112 }
113
114 pub(crate) fn unregister(&self, address: Address) -> Option<Lease> {
115 let (_, ForkEntry { fork_lease, .. }) = self.forks.remove(&address)?;
116 Some(fork_lease)
117 }
118}
119
120impl<M> ForkEntry<M> {
121 pub(crate) fn new(
122 fork_lease: Lease,
123 tx_priority: mpsc::UnboundedSender<M>,
124 tx_regular: mpsc::UnboundedSender<MessageWithPermit<M>>,
125 ) -> Self {
126 assert_eq!(fork_lease.net_address().mask, NetMask::M_64);
127 Self {
128 fork_lease,
129 tx_priority,
130 tx_regular,
131 }
132 }
133}
134
135impl<S, M> NetworkNode<S, M> {
136 pub(crate) fn new(
137 subnet_lease: Lease,
138 inbox_size: usize,
139 tx_system: mpsc::UnboundedSender<S>,
140 tx_priority: mpsc::UnboundedSender<M>,
141 tx_regular: mpsc::UnboundedSender<MessageWithPermit<M>>,
142 ) -> Self {
143 let sem_regular_messages = Arc::new(Semaphore::new(inbox_size));
144 Self {
145 subnet_lease,
146 sem_regular_messages,
147 tx_system,
148 tx_priority,
149 tx_regular,
150 }
151 }
152}
153
154impl<S, M> Node<S, M> {
155 pub(crate) fn send(&self, to: Address, priority: bool, message: M) -> Result<(), M> {
156 let (sem, chans) = match self {
157 Self::Actor(actor) => {
158 (
159 actor.sem_regular_messages.clone(),
160 actor
161 .forks
162 .get(&to)
163 .map(|f| (f.tx_priority.clone(), f.tx_regular.clone())),
164 )
165 },
166 Self::Network(network) => {
167 (
168 network.sem_regular_messages.clone(),
169 Some((network.tx_priority.clone(), network.tx_regular.clone())),
170 )
171 },
172 };
173 let Some((tx_priority, tx_regular)) = chans else {
174 return Err(message)
175 };
176 if priority {
177 tx_priority.send(message).map_err(|e| e.0)?;
178 } else {
179 let Ok(permit) = sem.try_acquire_owned() else {
180 return Err(message)
181 };
182 let message_with_permit = MessageWithPermit { message, permit };
183 tx_regular
184 .send(message_with_permit)
185 .map_err(|e| e.0.message)?;
186 }
187 Ok(())
188 }
189
190 pub(crate) fn sys_send(&self, sys_msg: S) -> Result<(), S> {
191 let tx_system = match self {
192 Self::Actor(a) => &a.tx_system,
193 Self::Network(n) => &n.tx_system,
194 };
195 tx_system.send(sys_msg).map_err(|e| e.0)
196 }
197}
198
199impl<S, M> Clone for Node<S, M> {
200 fn clone(&self) -> Self {
201 match self {
202 Self::Actor(a) => Self::Actor(a.clone()),
203 Self::Network(n) => Self::Network(n.clone()),
204 }
205 }
206}
207
208impl<S, M> Default for Registry<S, M> {
209 fn default() -> Self {
210 Self {
211 networks: Default::default(),
212 }
213 }
214}