discord_cassandra_cpp/cassandra/
cluster.rs1use crate::cassandra::error::*;
2use crate::cassandra::future::CassFuture;
3use crate::cassandra::policy::retry::RetryPolicy;
4use crate::cassandra::session::session_scope::{Bound, Unbound};
5use crate::cassandra::session::Session;
6use crate::cassandra::ssl::Ssl;
7use crate::cassandra::time::TimestampGen;
8use crate::cassandra::util::{Protected, ProtectedInner};
9
10use crate::cassandra_sys::cass_cluster_free;
11use crate::cassandra_sys::cass_cluster_new;
12use crate::cassandra_sys::cass_cluster_set_connect_timeout;
13use crate::cassandra_sys::cass_cluster_set_connection_heartbeat_interval;
14use crate::cassandra_sys::cass_cluster_set_connection_idle_timeout;
15use crate::cassandra_sys::cass_cluster_set_constant_speculative_execution_policy;
16use crate::cassandra_sys::cass_cluster_set_contact_points_n;
17use crate::cassandra_sys::cass_cluster_set_core_connections_per_host;
18use crate::cassandra_sys::cass_cluster_set_credentials_n;
19use crate::cassandra_sys::cass_cluster_set_latency_aware_routing;
20use crate::cassandra_sys::cass_cluster_set_latency_aware_routing_settings;
21use crate::cassandra_sys::cass_cluster_set_load_balance_dc_aware_n;
22use crate::cassandra_sys::cass_cluster_set_load_balance_round_robin;
23use crate::cassandra_sys::cass_cluster_set_local_address_n;
24use crate::cassandra_sys::cass_cluster_set_max_concurrent_creation;
25use crate::cassandra_sys::cass_cluster_set_max_concurrent_requests_threshold;
26use crate::cassandra_sys::cass_cluster_set_max_connections_per_host;
27use crate::cassandra_sys::cass_cluster_set_max_requests_per_flush;
28use crate::cassandra_sys::cass_cluster_set_no_speculative_execution_policy;
29use crate::cassandra_sys::cass_cluster_set_num_threads_io;
30use crate::cassandra_sys::cass_cluster_set_pending_requests_high_water_mark;
31use crate::cassandra_sys::cass_cluster_set_pending_requests_low_water_mark;
32use crate::cassandra_sys::cass_cluster_set_port;
33use crate::cassandra_sys::cass_cluster_set_protocol_version;
34use crate::cassandra_sys::cass_cluster_set_queue_size_event;
35use crate::cassandra_sys::cass_cluster_set_queue_size_io;
36use crate::cassandra_sys::cass_cluster_set_reconnect_wait_time;
37use crate::cassandra_sys::cass_cluster_set_request_timeout;
38use crate::cassandra_sys::cass_cluster_set_retry_policy;
39use crate::cassandra_sys::cass_cluster_set_ssl;
40use crate::cassandra_sys::cass_cluster_set_tcp_keepalive;
41use crate::cassandra_sys::cass_cluster_set_tcp_nodelay;
42use crate::cassandra_sys::cass_cluster_set_timestamp_gen;
43use crate::cassandra_sys::cass_cluster_set_token_aware_routing;
44use crate::cassandra_sys::cass_cluster_set_token_aware_routing_shuffle_replicas;
45use crate::cassandra_sys::cass_cluster_set_use_schema;
46use crate::cassandra_sys::cass_cluster_set_whitelist_filtering;
47use crate::cassandra_sys::cass_cluster_set_write_bytes_high_water_mark;
48use crate::cassandra_sys::cass_cluster_set_write_bytes_low_water_mark;
49use crate::cassandra_sys::cass_false;
50use crate::cassandra_sys::cass_future_error_code;
51use crate::cassandra_sys::cass_session_connect;
52use crate::cassandra_sys::cass_session_connect_keyspace_n;
53use crate::cassandra_sys::cass_session_new;
54use crate::cassandra_sys::cass_true;
55use crate::cassandra_sys::CassCluster as _Cluster;
56
57use std::ffi::NulError;
58use std::fmt;
59use std::fmt::Display;
60use std::iter::Map;
61use std::net::AddrParseError;
62use std::net::Ipv4Addr;
63use std::os::raw::c_char;
64use std::result;
65use std::str::FromStr;
66use std::{convert::TryInto, ffi::CStr};
67use time::Duration;
68
69pub type CqlProtocol = i32;
71
72#[derive(Debug)]
87pub struct Cluster(pub *mut _Cluster);
88
89unsafe impl Send for Cluster {}
92
93impl Drop for Cluster {
94 fn drop(&mut self) {
96 unsafe { cass_cluster_free(self.0) }
97 }
98}
99
100impl ProtectedInner<*mut _Cluster> for Cluster {
101 fn inner(&self) -> *mut _Cluster {
102 self.0
103 }
104}
105
106impl Protected<*mut _Cluster> for Cluster {
107 fn build(inner: *mut _Cluster) -> Self {
108 if inner.is_null() {
109 panic!("Unexpected null pointer")
110 };
111 Cluster(inner)
112 }
113}
114
115impl Default for Cluster {
116 fn default() -> Cluster {
118 unsafe { Cluster(cass_cluster_new()) }
119 }
120}
121
122impl Cluster {
123 pub fn set_contact_points(&mut self, contact_points: &str) -> Result<&mut Self> {
133 unsafe {
134 let cp_ptr = contact_points.as_ptr() as *const c_char;
135 let err = cass_cluster_set_contact_points_n(self.0, cp_ptr, contact_points.len());
136 err.to_result(self)
137 }
138 }
139
140 pub fn set_local_address(&mut self, name: &str) -> Result<&mut Self> {
145 unsafe {
146 let name_ptr = name.as_ptr() as *const c_char;
147 let err = cass_cluster_set_local_address_n(self.0, name_ptr, name.len());
148 err.to_result(self)
149 }
150 }
151
152 pub fn set_port(&mut self, port: u16) -> Result<&mut Self> {
158 unsafe { cass_cluster_set_port(self.0, port as i32).to_result(self) }
159 }
160
161 pub fn set_ssl(&mut self, ssl: Ssl) -> &Self {
163 unsafe {
164 cass_cluster_set_ssl(self.0, ssl.inner());
165 self
166 }
167 }
168
169 pub async fn connect(&mut self) -> Result<Session<Unbound>> {
171 let session = Session::new(Unbound);
172 let connect_future = {
173 let connect = unsafe { cass_session_connect(session.inner(), self.0) };
174 CassFuture::build(session, connect)
175 };
176 connect_future.await
177 }
178
179 pub async fn connect_keyspace(&mut self, keyspace: &str) -> Result<Session> {
181 let session = Session::new(Bound::new(keyspace));
182 let keyspace_ptr = keyspace.as_ptr() as *const c_char;
183 let connect_keyspace = unsafe {
184 cass_session_connect_keyspace_n(
185 session.inner(),
186 self.inner(),
187 keyspace_ptr,
188 keyspace.len(),
189 )
190 };
191 let connect_future = CassFuture::build(session, connect_keyspace);
192 connect_future.await
193 }
194
195 pub fn set_protocol_version(&mut self, protocol_version: CqlProtocol) -> Result<&mut Self> {
202 unsafe {
203 cass_cluster_set_protocol_version(self.0, protocol_version as i32).to_result(self)
204 }
205 }
206
207 pub fn set_num_threads_io(&mut self, num_threads: u32) -> Result<&mut Self> {
214 unsafe { cass_cluster_set_num_threads_io(self.0, num_threads).to_result(self) }
215 }
216
217 pub fn set_queue_size_io(&mut self, queue_size: u32) -> Result<&mut Self> {
223 unsafe { cass_cluster_set_queue_size_io(self.0, queue_size).to_result(self) }
224 }
225
226 pub fn set_queue_size_event(&mut self, queue_size: u32) -> Result<&mut Self> {
232 unsafe { cass_cluster_set_queue_size_event(self.0, queue_size).to_result(self) }
233 }
234
235 pub fn set_core_connections_per_host(&mut self, num_connections: u32) -> Result<&mut Self> {
242 unsafe {
243 cass_cluster_set_core_connections_per_host(self.0, num_connections).to_result(self)
244 }
245 }
246
247 pub fn set_max_connections_per_host(&mut self, num_connections: u32) -> Result<&mut Self> {
254 unsafe {
255 cass_cluster_set_max_connections_per_host(self.0, num_connections).to_result(self)
256 }
257 }
258
259 pub fn set_reconnect_wait_time(&mut self, wait_time: u32) -> &Self {
265 unsafe {
266 cass_cluster_set_reconnect_wait_time(self.0, wait_time);
267 }
268 self
269 }
270
271 pub fn set_max_concurrent_creation(&mut self, num_connections: u32) -> Result<&mut Self> {
278 unsafe { cass_cluster_set_max_concurrent_creation(self.0, num_connections).to_result(self) }
279 }
280
281 pub fn set_max_concurrent_requests_threshold(
288 &mut self,
289 num_requests: u32,
290 ) -> Result<&mut Self> {
291 unsafe {
292 cass_cluster_set_max_concurrent_requests_threshold(self.0, num_requests).to_result(self)
293 }
294 }
295
296 pub fn set_max_requests_per_flush(&mut self, num_requests: u32) -> Result<&mut Self> {
302 unsafe { cass_cluster_set_max_requests_per_flush(self.0, num_requests).to_result(self) }
303 }
304
305 pub fn set_write_bytes_high_water_mark(&mut self, num_bytes: u32) -> Result<&mut Self> {
312 unsafe { cass_cluster_set_write_bytes_high_water_mark(self.0, num_bytes).to_result(self) }
313 }
314
315 pub fn set_write_bytes_low_water_mark(&mut self, num_bytes: u32) -> Result<&mut Self> {
322 unsafe { cass_cluster_set_write_bytes_low_water_mark(self.0, num_bytes).to_result(self) }
323 }
324
325 pub fn set_pending_requests_high_water_mark(&mut self, num_requests: u32) -> Result<&mut Self> {
333 unsafe {
334 cass_cluster_set_pending_requests_high_water_mark(self.0, num_requests).to_result(self)
335 }
336 }
337
338 pub fn set_pending_requests_low_water_mark(&mut self, num_requests: u32) -> Result<&mut Self> {
346 unsafe {
347 cass_cluster_set_pending_requests_low_water_mark(self.0, num_requests).to_result(self)
348 }
349 }
350
351 pub fn set_connect_timeout(&mut self, timeout: Duration) -> &Self {
356 unsafe {
357 cass_cluster_set_connect_timeout(self.0, timeout.whole_milliseconds() as u32);
358 }
359 self
360 }
361
362 pub fn set_request_timeout(&mut self, timeout: Duration) -> &Self {
367 unsafe {
368 cass_cluster_set_request_timeout(self.0, timeout.whole_milliseconds() as u32);
369 }
370 self
371 }
372
373 pub fn set_credentials(&mut self, username: &str, password: &str) -> Result<&mut Self> {
375 unsafe {
376 let username_ptr = username.as_ptr() as *const c_char;
377 let password_ptr = password.as_ptr() as *const c_char;
378 cass_cluster_set_credentials_n(
379 self.0,
380 username_ptr,
381 username.len(),
382 password_ptr,
383 password.len(),
384 );
385 }
386 Ok(self)
387 }
388
389 pub fn set_load_balance_round_robin(&mut self) -> &Self {
394 unsafe {
395 cass_cluster_set_load_balance_round_robin(self.0);
396 self
397 }
398 }
399
400 pub fn set_load_balance_dc_aware<S>(
411 &mut self,
412 local_dc: &str,
413 used_hosts_per_remote_dc: u32,
414 allow_remote_dcs_for_local_cl: bool,
415 ) -> Result<&mut Self> {
416 unsafe {
417 {
418 let local_dc_ptr = local_dc.as_ptr() as *const c_char;
419 cass_cluster_set_load_balance_dc_aware_n(
420 self.0,
421 local_dc_ptr,
422 local_dc.len(),
423 used_hosts_per_remote_dc,
424 if allow_remote_dcs_for_local_cl {
425 cass_true
426 } else {
427 cass_false
428 },
429 )
430 }
431 .to_result(self)
432 }
433 }
434
435 pub fn set_token_aware_routing(&mut self, enabled: bool) -> &Self {
449 unsafe {
450 cass_cluster_set_token_aware_routing(
451 self.0,
452 if enabled { cass_true } else { cass_false },
453 );
454 }
455 self
456 }
457
458 pub fn set_token_aware_routing_shuffle_replicas(&mut self, enabled: bool) -> &Self {
469 unsafe {
470 cass_cluster_set_token_aware_routing_shuffle_replicas(
471 self.0,
472 if enabled { cass_true } else { cass_false },
473 );
474 }
475 self
476 }
477
478 pub fn set_latency_aware_routing(&mut self, enabled: bool) -> &Self {
488 unsafe {
489 cass_cluster_set_latency_aware_routing(
490 self.0,
491 if enabled { cass_true } else { cass_false },
492 );
493 }
494 self
495 }
496
497 pub fn set_latency_aware_routing_settings(
510 &mut self,
511 exclusion_threshold: f64,
512 scale: Duration,
513 retry_period: Duration,
514 update_rate: Duration,
515 min_measured: u64,
516 ) -> &Self {
517 unsafe {
518 cass_cluster_set_latency_aware_routing_settings(
519 self.0,
520 exclusion_threshold,
521 scale.whole_milliseconds() as u64,
522 retry_period.whole_milliseconds() as u64,
523 update_rate.whole_milliseconds() as u64,
524 min_measured,
525 );
526 }
527 self
528 }
529
530 pub fn set_whitelist_filtering(&mut self, hosts: Vec<String>) -> &Self {
542 unsafe {
543 cass_cluster_set_whitelist_filtering(self.0, hosts.join(",").as_ptr() as *const i8);
544 }
545 self
546 }
547
548 pub fn set_tcp_nodelay(&mut self, enable: bool) -> &Self {
553 unsafe {
554 cass_cluster_set_tcp_nodelay(self.0, if enable { cass_true } else { cass_false });
555 }
556 self
557 }
558
559 pub fn set_tcp_keepalive(&mut self, enable: bool, delay: Duration) -> &Self {
564 unsafe {
565 cass_cluster_set_tcp_keepalive(
566 self.0,
567 if enable { cass_true } else { cass_false },
568 delay.whole_seconds() as u32,
569 );
570 }
571 self
572 }
573
574 pub fn set_timestamp_gen(&mut self, tsg: &TimestampGen) -> &mut Self {
580 unsafe {
581 cass_cluster_set_timestamp_gen(self.0, TimestampGen::inner(tsg));
582 self
583 }
584 }
585
586 pub fn set_connection_heartbeat_interval(&mut self, hearbeat: Duration) -> &mut Self {
594 unsafe {
595 cass_cluster_set_connection_heartbeat_interval(self.0, hearbeat.whole_seconds() as u32);
596 self
597 }
598 }
599
600 pub fn set_connection_idle_timeout(&mut self, timeout: Duration) -> &mut Self {
606 unsafe {
607 cass_cluster_set_connection_idle_timeout(self.0, timeout.whole_seconds() as u32);
608 self
609 }
610 }
611
612 pub fn set_retry_policy(&mut self, retry_policy: RetryPolicy) -> &mut Self {
623 unsafe {
624 cass_cluster_set_retry_policy(self.0, retry_policy.inner());
625 self
626 }
627 }
628
629 pub fn set_use_schema(&mut self, enabled: bool) -> &Self {
638 unsafe {
639 cass_cluster_set_use_schema(self.0, if enabled { cass_true } else { cass_false });
640 }
641 self
642 }
643
644 pub fn set_constant_speculative_execution_policy(
650 &self,
651 constant_delay: Duration,
652 max_speculative_executions: u8,
653 ) -> Result<&Self> {
654 unsafe {
655 cass_cluster_set_constant_speculative_execution_policy(
656 self.0,
657 constant_delay
658 .whole_milliseconds()
659 .try_into()
660 .expect("panic: duration too big."),
661 max_speculative_executions as _,
662 )
663 .to_result(self)
664 }
665 }
666
667 pub fn set_no_speculative_execution_policy(&self) -> Result<&Self> {
671 unsafe { cass_cluster_set_no_speculative_execution_policy(self.0).to_result(self) }
672 }
673}