1use atomic::Atomic;
2use cassandra_protocol::error::{Error, Result};
3use cassandra_protocol::frame::Envelope;
4use std::fmt::{Debug, Formatter};
5use std::net::SocketAddr;
6use std::sync::atomic::Ordering;
7use std::sync::Arc;
8use tokio::sync::mpsc::Sender;
9use tokio::sync::OnceCell;
10use tracing::*;
11use uuid::Uuid;
12
13use crate::cluster::connection_pool::{ConnectionPool, ConnectionPoolFactory};
14use crate::cluster::topology::{NodeDistance, NodeState};
15use crate::cluster::Murmur3Token;
16use crate::cluster::{ConnectionManager, NodeInfo};
17use crate::transport::CdrsTransport;
18
19pub struct Node<T: CdrsTransport + 'static, CM: ConnectionManager<T> + 'static> {
21 connection_pool_factory: Arc<ConnectionPoolFactory<T, CM>>,
22 connection_pool: OnceCell<Arc<ConnectionPool<T, CM>>>,
23 broadcast_rpc_address: SocketAddr,
24 broadcast_address: Option<SocketAddr>,
25 distance: Option<NodeDistance>,
26 state: Atomic<NodeState>,
27 host_id: Option<Uuid>,
28 tokens: Vec<Murmur3Token>,
29 rack: String,
30 datacenter: String,
31}
32
33impl<T: CdrsTransport, CM: ConnectionManager<T>> Debug for Node<T, CM> {
34 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
35 f.debug_struct("Node")
36 .field("broadcast_rpc_address", &self.broadcast_rpc_address)
37 .field("broadcast_address", &self.broadcast_address)
38 .field("distance", &self.distance)
39 .field("state", &self.state)
40 .field("host_id", &self.host_id)
41 .field("tokens", &self.tokens)
42 .field("rack", &self.rack)
43 .field("datacenter", &self.datacenter)
44 .finish()
45 }
46}
47
48impl<T: CdrsTransport, CM: ConnectionManager<T>> Node<T, CM> {
49 #[allow(clippy::too_many_arguments)]
50 pub(crate) fn new(
51 connection_pool_factory: Arc<ConnectionPoolFactory<T, CM>>,
52 broadcast_rpc_address: SocketAddr,
53 broadcast_address: Option<SocketAddr>,
54 host_id: Option<Uuid>,
55 distance: Option<NodeDistance>,
56 tokens: Vec<Murmur3Token>,
57 rack: String,
58 datacenter: String,
59 ) -> Self {
60 Self {
61 connection_pool_factory,
62 connection_pool: Default::default(),
63 broadcast_rpc_address,
64 broadcast_address,
65 distance,
66 state: Atomic::new(NodeState::Unknown),
67 host_id,
68 tokens,
69 rack,
70 datacenter,
71 }
72 }
73
74 #[allow(clippy::too_many_arguments)]
75 pub(crate) fn new_with_state(
76 connection_pool_factory: Arc<ConnectionPoolFactory<T, CM>>,
77 broadcast_rpc_address: SocketAddr,
78 broadcast_address: Option<SocketAddr>,
79 host_id: Option<Uuid>,
80 distance: Option<NodeDistance>,
81 state: NodeState,
82 tokens: Vec<Murmur3Token>,
83 rack: String,
84 datacenter: String,
85 ) -> Self {
86 Self {
87 connection_pool_factory,
88 connection_pool: Default::default(),
89 broadcast_rpc_address,
90 broadcast_address,
91 distance,
92 state: Atomic::new(state),
93 host_id,
94 tokens,
95 rack,
96 datacenter,
97 }
98 }
99
100 #[allow(clippy::too_many_arguments)]
101 pub(crate) fn with_state(
102 connection_pool_factory: Arc<ConnectionPoolFactory<T, CM>>,
103 broadcast_rpc_address: SocketAddr,
104 broadcast_address: Option<SocketAddr>,
105 host_id: Option<Uuid>,
106 state: NodeState,
107 tokens: Vec<Murmur3Token>,
108 rack: String,
109 datacenter: String,
110 ) -> Self {
111 Self {
112 connection_pool_factory,
113 connection_pool: Default::default(),
114 broadcast_rpc_address,
115 broadcast_address,
116 distance: None,
117 state: Atomic::new(state),
118 host_id,
119 tokens,
120 rack,
121 datacenter,
122 }
123 }
124
125 #[cfg(test)]
126 pub(crate) fn with_distance(
127 connection_pool_factory: Arc<ConnectionPoolFactory<T, CM>>,
128 broadcast_rpc_address: SocketAddr,
129 broadcast_address: Option<SocketAddr>,
130 host_id: Option<Uuid>,
131 distance: NodeDistance,
132 ) -> Self {
133 Self {
134 connection_pool_factory,
135 connection_pool: Default::default(),
136 broadcast_rpc_address,
137 broadcast_address,
138 distance: Some(distance),
139 state: Atomic::new(NodeState::Unknown),
140 host_id,
141 tokens: Default::default(),
142 rack: Default::default(),
143 datacenter: Default::default(),
144 }
145 }
146
147 #[inline]
148 pub fn state(&self) -> NodeState {
149 self.state.load(Ordering::Relaxed)
150 }
151
152 #[inline]
155 pub fn host_id(&self) -> Option<Uuid> {
156 self.host_id
157 }
158
159 #[inline]
162 pub fn broadcast_rpc_address(&self) -> SocketAddr {
163 self.broadcast_rpc_address
164 }
165
166 #[inline]
169 pub fn broadcast_address(&self) -> Option<SocketAddr> {
170 self.broadcast_address
171 }
172
173 #[inline]
175 pub fn tokens(&self) -> &[Murmur3Token] {
176 &self.tokens
177 }
178
179 #[inline]
181 pub fn datacenter(&self) -> &str {
182 &self.datacenter
183 }
184
185 #[inline]
187 pub fn rack(&self) -> &str {
188 &self.rack
189 }
190
191 #[inline]
193 pub async fn persistent_connection(self: &Arc<Self>) -> Result<Arc<T>> {
194 let pool = self
195 .connection_pool
196 .get_or_try_init(|| {
197 debug!(?self.host_id, "Creating connection pool");
198
199 self.connection_pool_factory.create(
200 self.distance.unwrap_or(NodeDistance::Remote),
201 self.broadcast_rpc_address,
202 Arc::downgrade(self),
203 )
204 })
205 .await;
206
207 let pool = match pool {
208 Ok(pool) => pool,
209 Err(Error::InvalidProtocol(addr)) => {
210 self.force_down();
212 return Err(Error::InvalidProtocol(addr));
213 }
214 Err(error) => return Err(error),
215 };
216
217 pool.connection().await
218 }
219
220 pub async fn is_any_connection_up(&self) -> bool {
222 if let Some(pool) = self.connection_pool.get() {
223 pool.is_any_connection_up().await
224 } else {
225 false
226 }
227 }
228
229 pub async fn new_connection(
231 &self,
232 event_handler: Option<Sender<Envelope>>,
233 error_handler: Option<Sender<Error>>,
234 ) -> Result<T> {
235 debug!("Establishing new connection to node...");
236 self.connection_pool_factory
237 .connection_manager()
238 .connection(event_handler, error_handler, self.broadcast_rpc_address)
239 .await
240 }
241
242 #[inline]
244 pub fn distance(&self) -> Option<NodeDistance> {
245 self.distance
246 }
247
248 #[inline]
250 pub fn is_local(&self) -> bool {
251 self.distance == Some(NodeDistance::Local)
252 }
253
254 #[inline]
256 pub fn is_remote(&self) -> bool {
257 self.distance == Some(NodeDistance::Remote)
258 }
259
260 #[inline]
262 pub fn is_ignored(&self) -> bool {
263 self.distance.is_none() || self.state.load(Ordering::Relaxed) != NodeState::Up
264 }
265
266 pub(crate) fn force_down(&self) {
267 self.state.store(NodeState::ForcedDown, Ordering::Relaxed);
268 }
269
270 pub(crate) fn mark_down(&self) {
271 self.state.store(NodeState::Down, Ordering::Relaxed);
272 }
273
274 pub(crate) fn mark_up(&self) {
275 self.state.store(NodeState::Up, Ordering::Relaxed);
276 }
277
278 #[inline]
279 pub(crate) fn clone_with_node_info(&self, node_info: NodeInfo) -> Self {
280 let address_changed = self
281 .broadcast_address
282 .map(|address| address != node_info.broadcast_rpc_address)
283 .unwrap_or(false);
286
287 let mut new_node_state = self.state.load(Ordering::Relaxed);
288 if address_changed {
289 new_node_state = NodeState::Unknown;
290 }
291
292 if new_node_state == NodeState::Down {
300 debug!(
301 ?node_info.broadcast_rpc_address,
302 "Cloned the node with Down state",
303 );
304 new_node_state = NodeState::Up;
305 }
306
307 Self {
308 connection_pool_factory: self.connection_pool_factory.clone(),
309 connection_pool: Default::default(),
310 broadcast_rpc_address: node_info.broadcast_rpc_address,
311 broadcast_address: node_info.broadcast_address,
312 distance: if address_changed { None } else { self.distance },
314 state: Atomic::new(new_node_state),
315 host_id: Some(node_info.host_id),
316 tokens: node_info.tokens,
317 rack: node_info.rack,
318 datacenter: node_info.datacenter,
319 }
320 }
321
322 #[inline]
323 pub(crate) fn clone_as_contact_point(&self, node_info: NodeInfo) -> Self {
324 Self {
326 connection_pool_factory: self.connection_pool_factory.clone(),
327 connection_pool: self.connection_pool.clone(),
328 broadcast_rpc_address: self.broadcast_rpc_address,
329 broadcast_address: node_info.broadcast_address,
330 distance: self.distance,
331 state: Atomic::new(self.state.load(Ordering::Relaxed)),
332 host_id: Some(node_info.host_id),
333 tokens: node_info.tokens,
334 rack: node_info.rack,
335 datacenter: node_info.datacenter,
336 }
337 }
338
339 #[inline]
340 pub(crate) fn clone_with_node_info_and_state(
341 &self,
342 node_info: NodeInfo,
343 state: NodeState,
344 ) -> Self {
345 Self {
346 connection_pool_factory: self.connection_pool_factory.clone(),
347 connection_pool: Default::default(),
348 broadcast_rpc_address: node_info.broadcast_rpc_address,
349 broadcast_address: node_info.broadcast_address,
350 distance: None,
352 state: Atomic::new(state),
353 host_id: Some(node_info.host_id),
354 tokens: node_info.tokens,
355 rack: node_info.rack,
356 datacenter: node_info.datacenter,
357 }
358 }
359
360 #[inline]
361 pub(crate) fn clone_with_node_state(&self, state: NodeState) -> Self {
362 Self {
363 connection_pool_factory: self.connection_pool_factory.clone(),
364 connection_pool: Default::default(),
365 broadcast_rpc_address: self.broadcast_rpc_address,
366 broadcast_address: self.broadcast_address,
367 distance: self.distance,
368 state: Atomic::new(state),
369 host_id: self.host_id,
370 tokens: self.tokens.clone(),
371 rack: self.rack.clone(),
372 datacenter: self.datacenter.clone(),
373 }
374 }
375}