cdrs_tokio/cluster/topology/
node.rs

1use atomic::Atomic;
2use cassandra_protocol::error::{Error, Result};
3use cassandra_protocol::frame::Envelope;
4use std::fmt::{Debug, Formatter};
5use std::net::SocketAddr;
6use std::sync::atomic::Ordering;
7use std::sync::Arc;
8use tokio::sync::mpsc::Sender;
9use tokio::sync::OnceCell;
10use tracing::*;
11use uuid::Uuid;
12
13use crate::cluster::connection_pool::{ConnectionPool, ConnectionPoolFactory};
14use crate::cluster::topology::{NodeDistance, NodeState};
15use crate::cluster::Murmur3Token;
16use crate::cluster::{ConnectionManager, NodeInfo};
17use crate::transport::CdrsTransport;
18
19/// Metadata about a Cassandra node in the cluster, along with a connection.
20pub struct Node<T: CdrsTransport + 'static, CM: ConnectionManager<T> + 'static> {
21    connection_pool_factory: Arc<ConnectionPoolFactory<T, CM>>,
22    connection_pool: OnceCell<Arc<ConnectionPool<T, CM>>>,
23    broadcast_rpc_address: SocketAddr,
24    broadcast_address: Option<SocketAddr>,
25    distance: Option<NodeDistance>,
26    state: Atomic<NodeState>,
27    host_id: Option<Uuid>,
28    tokens: Vec<Murmur3Token>,
29    rack: String,
30    datacenter: String,
31}
32
33impl<T: CdrsTransport, CM: ConnectionManager<T>> Debug for Node<T, CM> {
34    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
35        f.debug_struct("Node")
36            .field("broadcast_rpc_address", &self.broadcast_rpc_address)
37            .field("broadcast_address", &self.broadcast_address)
38            .field("distance", &self.distance)
39            .field("state", &self.state)
40            .field("host_id", &self.host_id)
41            .field("tokens", &self.tokens)
42            .field("rack", &self.rack)
43            .field("datacenter", &self.datacenter)
44            .finish()
45    }
46}
47
48impl<T: CdrsTransport, CM: ConnectionManager<T>> Node<T, CM> {
49    #[allow(clippy::too_many_arguments)]
50    pub(crate) fn new(
51        connection_pool_factory: Arc<ConnectionPoolFactory<T, CM>>,
52        broadcast_rpc_address: SocketAddr,
53        broadcast_address: Option<SocketAddr>,
54        host_id: Option<Uuid>,
55        distance: Option<NodeDistance>,
56        tokens: Vec<Murmur3Token>,
57        rack: String,
58        datacenter: String,
59    ) -> Self {
60        Self {
61            connection_pool_factory,
62            connection_pool: Default::default(),
63            broadcast_rpc_address,
64            broadcast_address,
65            distance,
66            state: Atomic::new(NodeState::Unknown),
67            host_id,
68            tokens,
69            rack,
70            datacenter,
71        }
72    }
73
74    #[allow(clippy::too_many_arguments)]
75    pub(crate) fn new_with_state(
76        connection_pool_factory: Arc<ConnectionPoolFactory<T, CM>>,
77        broadcast_rpc_address: SocketAddr,
78        broadcast_address: Option<SocketAddr>,
79        host_id: Option<Uuid>,
80        distance: Option<NodeDistance>,
81        state: NodeState,
82        tokens: Vec<Murmur3Token>,
83        rack: String,
84        datacenter: String,
85    ) -> Self {
86        Self {
87            connection_pool_factory,
88            connection_pool: Default::default(),
89            broadcast_rpc_address,
90            broadcast_address,
91            distance,
92            state: Atomic::new(state),
93            host_id,
94            tokens,
95            rack,
96            datacenter,
97        }
98    }
99
100    #[allow(clippy::too_many_arguments)]
101    pub(crate) fn with_state(
102        connection_pool_factory: Arc<ConnectionPoolFactory<T, CM>>,
103        broadcast_rpc_address: SocketAddr,
104        broadcast_address: Option<SocketAddr>,
105        host_id: Option<Uuid>,
106        state: NodeState,
107        tokens: Vec<Murmur3Token>,
108        rack: String,
109        datacenter: String,
110    ) -> Self {
111        Self {
112            connection_pool_factory,
113            connection_pool: Default::default(),
114            broadcast_rpc_address,
115            broadcast_address,
116            distance: None,
117            state: Atomic::new(state),
118            host_id,
119            tokens,
120            rack,
121            datacenter,
122        }
123    }
124
125    #[cfg(test)]
126    pub(crate) fn with_distance(
127        connection_pool_factory: Arc<ConnectionPoolFactory<T, CM>>,
128        broadcast_rpc_address: SocketAddr,
129        broadcast_address: Option<SocketAddr>,
130        host_id: Option<Uuid>,
131        distance: NodeDistance,
132    ) -> Self {
133        Self {
134            connection_pool_factory,
135            connection_pool: Default::default(),
136            broadcast_rpc_address,
137            broadcast_address,
138            distance: Some(distance),
139            state: Atomic::new(NodeState::Unknown),
140            host_id,
141            tokens: Default::default(),
142            rack: Default::default(),
143            datacenter: Default::default(),
144        }
145    }
146
147    #[inline]
148    pub fn state(&self) -> NodeState {
149        self.state.load(Ordering::Relaxed)
150    }
151
152    /// The host ID that is assigned to this node by Cassandra. This value can be used to uniquely
153    /// identify a node even when the underling IP address changes.
154    #[inline]
155    pub fn host_id(&self) -> Option<Uuid> {
156        self.host_id
157    }
158
159    /// The node's broadcast RPC address. That is, the address that the node expects clients to
160    /// connect to.
161    #[inline]
162    pub fn broadcast_rpc_address(&self) -> SocketAddr {
163        self.broadcast_rpc_address
164    }
165
166    /// The node's broadcast address. That is, the address that other nodes use to communicate with
167    /// that node.
168    #[inline]
169    pub fn broadcast_address(&self) -> Option<SocketAddr> {
170        self.broadcast_address
171    }
172
173    /// Returns tokens associated with the node.
174    #[inline]
175    pub fn tokens(&self) -> &[Murmur3Token] {
176        &self.tokens
177    }
178
179    /// Returns the dc the node is in.
180    #[inline]
181    pub fn datacenter(&self) -> &str {
182        &self.datacenter
183    }
184
185    /// Returns the rack the node is in.
186    #[inline]
187    pub fn rack(&self) -> &str {
188        &self.rack
189    }
190
191    /// Returns a connection to given node.
192    #[inline]
193    pub async fn persistent_connection(self: &Arc<Self>) -> Result<Arc<T>> {
194        let pool = self
195            .connection_pool
196            .get_or_try_init(|| {
197                debug!(?self.host_id, "Creating connection pool");
198
199                self.connection_pool_factory.create(
200                    self.distance.unwrap_or(NodeDistance::Remote),
201                    self.broadcast_rpc_address,
202                    Arc::downgrade(self),
203                )
204            })
205            .await;
206
207        let pool = match pool {
208            Ok(pool) => pool,
209            Err(Error::InvalidProtocol(addr)) => {
210                // we can't connect to this node even if it's up
211                self.force_down();
212                return Err(Error::InvalidProtocol(addr));
213            }
214            Err(error) => return Err(error),
215        };
216
217        pool.connection().await
218    }
219
220    /// Checks if any connection is still available.
221    pub async fn is_any_connection_up(&self) -> bool {
222        if let Some(pool) = self.connection_pool.get() {
223            pool.is_any_connection_up().await
224        } else {
225            false
226        }
227    }
228
229    /// Creates a new connection to the node with optional event and error handlers.
230    pub async fn new_connection(
231        &self,
232        event_handler: Option<Sender<Envelope>>,
233        error_handler: Option<Sender<Error>>,
234    ) -> Result<T> {
235        debug!("Establishing new connection to node...");
236        self.connection_pool_factory
237            .connection_manager()
238            .connection(event_handler, error_handler, self.broadcast_rpc_address)
239            .await
240    }
241
242    /// Returns node distance in relation to the driver, if available.
243    #[inline]
244    pub fn distance(&self) -> Option<NodeDistance> {
245        self.distance
246    }
247
248    /// Checks if the node is local in relation to the driver.
249    #[inline]
250    pub fn is_local(&self) -> bool {
251        self.distance == Some(NodeDistance::Local)
252    }
253
254    /// Checks if the node is remote in relation to the driver.
255    #[inline]
256    pub fn is_remote(&self) -> bool {
257        self.distance == Some(NodeDistance::Remote)
258    }
259
260    /// Should this node be ignored from establishing connections.
261    #[inline]
262    pub fn is_ignored(&self) -> bool {
263        self.distance.is_none() || self.state.load(Ordering::Relaxed) != NodeState::Up
264    }
265
266    pub(crate) fn force_down(&self) {
267        self.state.store(NodeState::ForcedDown, Ordering::Relaxed);
268    }
269
270    pub(crate) fn mark_down(&self) {
271        self.state.store(NodeState::Down, Ordering::Relaxed);
272    }
273
274    pub(crate) fn mark_up(&self) {
275        self.state.store(NodeState::Up, Ordering::Relaxed);
276    }
277
278    #[inline]
279    pub(crate) fn clone_with_node_info(&self, node_info: NodeInfo) -> Self {
280        let address_changed = self
281            .broadcast_address
282            .map(|address| address != node_info.broadcast_rpc_address)
283            // if we don't know the previous address, we'll trust whoever inserted the node to
284            // know its state
285            .unwrap_or(false);
286
287        let mut new_node_state = self.state.load(Ordering::Relaxed);
288        if address_changed {
289            new_node_state = NodeState::Unknown;
290        }
291
292        // If we recreate the node with the status Down, it will be removed from the load-balancing strategy.
293        // The only way to promote the node back to the Up state is to receive an error in
294        // connection_pool.rs::monitor_connections and schedule the reconnection. This method
295        // is triggered only when we call persistent_connection on the node, which means that the node
296        // must be a part of the load-balancing strategy at least once.
297        // We do not care about it for Unknown and ForcedDown, because these will be taken care of
298        // on topology events in control_connection.rs::process_events or in load balancers
299        if new_node_state == NodeState::Down {
300            debug!(
301                ?node_info.broadcast_rpc_address,
302                "Cloned the node with Down state",
303            );
304            new_node_state = NodeState::Up;
305        }
306
307        Self {
308            connection_pool_factory: self.connection_pool_factory.clone(),
309            connection_pool: Default::default(),
310            broadcast_rpc_address: node_info.broadcast_rpc_address,
311            broadcast_address: node_info.broadcast_address,
312            // since address could change, we can't be sure of distance or state
313            distance: if address_changed { None } else { self.distance },
314            state: Atomic::new(new_node_state),
315            host_id: Some(node_info.host_id),
316            tokens: node_info.tokens,
317            rack: node_info.rack,
318            datacenter: node_info.datacenter,
319        }
320    }
321
322    #[inline]
323    pub(crate) fn clone_as_contact_point(&self, node_info: NodeInfo) -> Self {
324        // control points might have valid state already, so no need to reset
325        Self {
326            connection_pool_factory: self.connection_pool_factory.clone(),
327            connection_pool: self.connection_pool.clone(),
328            broadcast_rpc_address: self.broadcast_rpc_address,
329            broadcast_address: node_info.broadcast_address,
330            distance: self.distance,
331            state: Atomic::new(self.state.load(Ordering::Relaxed)),
332            host_id: Some(node_info.host_id),
333            tokens: node_info.tokens,
334            rack: node_info.rack,
335            datacenter: node_info.datacenter,
336        }
337    }
338
339    #[inline]
340    pub(crate) fn clone_with_node_info_and_state(
341        &self,
342        node_info: NodeInfo,
343        state: NodeState,
344    ) -> Self {
345        Self {
346            connection_pool_factory: self.connection_pool_factory.clone(),
347            connection_pool: Default::default(),
348            broadcast_rpc_address: node_info.broadcast_rpc_address,
349            broadcast_address: node_info.broadcast_address,
350            // since address could change, we can't be sure of distance
351            distance: None,
352            state: Atomic::new(state),
353            host_id: Some(node_info.host_id),
354            tokens: node_info.tokens,
355            rack: node_info.rack,
356            datacenter: node_info.datacenter,
357        }
358    }
359
360    #[inline]
361    pub(crate) fn clone_with_node_state(&self, state: NodeState) -> Self {
362        Self {
363            connection_pool_factory: self.connection_pool_factory.clone(),
364            connection_pool: Default::default(),
365            broadcast_rpc_address: self.broadcast_rpc_address,
366            broadcast_address: self.broadcast_address,
367            distance: self.distance,
368            state: Atomic::new(state),
369            host_id: self.host_id,
370            tokens: self.tokens.clone(),
371            rack: self.rack.clone(),
372            datacenter: self.datacenter.clone(),
373        }
374    }
375}