1use std::{fmt::Display, num::ParseIntError, str::FromStr, time::Duration};
2
3use futures_core::future::BoxFuture;
4use log::LevelFilter;
5use scylla::{client::session::TlsContext, frame::Compression};
6use sqlx::{ConnectOptions, Error};
7use sqlx_core::connection::LogSettings;
8use url::Url;
9
10use crate::{ScyllaDBError, connection::ScyllaDBConnection};
11
12const DEFAULT_PORT: u16 = 9042;
13const DEFAULT_PAGE_SIZE: i32 = 5000;
14const DEFAULT_STATEMENT_CACHE_CAPACITY: usize = 128;
15
16#[derive(Debug, Clone)]
18pub struct ScyllaDBConnectOptions {
19 pub(crate) host: String,
20 pub(crate) port: u16,
21 nodes: Vec<String>,
22 pub(crate) keyspace: Option<String>,
23 pub(crate) statement_cache_capacity: usize,
24 pub(crate) log_settings: LogSettings,
25 pub(crate) tcp_nodelay: bool,
26 pub(crate) username: Option<String>,
27 pub(crate) password: Option<String>,
28 pub(crate) replication_strategy: Option<ScyllaDBReplicationStrategy>,
29 pub(crate) replication_factor: usize,
30 pub(crate) compression: Option<ScyllaDBCompression>,
31 pub(crate) tls_rootcert: Option<String>,
32 pub(crate) tls_cert: Option<String>,
33 pub(crate) tls_key: Option<String>,
34 pub(crate) tcp_keepalive: Option<Duration>,
35 pub(crate) page_size: i32,
36}
37
38impl ScyllaDBConnectOptions {
39 pub(crate) fn parse_from_url(url: &Url) -> Result<Self, Error> {
40 let mut options = Self::new();
41
42 if let Some(host) = url.host_str() {
43 options = options.host(host);
44 }
45
46 if let Some(port) = url.port() {
47 options = options.port(port)
48 }
49
50 let path = url.path().trim_start_matches('/');
51 if !path.is_empty() {
52 options = options.keyspace(path);
53 }
54
55 let username = url.username();
56 if !username.is_empty() {
57 options = options.username(username);
58 if let Some(password) = url.password() {
59 options = options.password(password);
60 }
61 }
62
63 let query_pairs = url.query_pairs();
64 for (key, value) in query_pairs {
65 match key.as_ref() {
66 "nodes" => {
67 let nodes = value.split(",");
68 for node in nodes {
69 options = options.add_node(node);
70 }
71 }
72 "replication_strategy" => {
73 let strategy = ScyllaDBReplicationStrategy::from_str(&value)?;
74 options = options.replication_strategy(strategy);
75 }
76 "replication_factor" => {
77 let replication_factor = value.parse().map_err(|err: ParseIntError| {
78 let message = format!("Invalid replication_factor. {err}");
79 Error::Configuration(message.into())
80 })?;
81 options = options.replication_factor(replication_factor);
82 }
83 "compression" => {
84 let compressor = ScyllaDBCompression::from_str(&value)?;
85 options = options.compresson(compressor);
86 }
87 "tcp_nodelay" => {
88 options = options.tcp_nodelay();
89 }
90 "tcp_keepalive" => {
91 let secs = value.parse().map_err(|err: ParseIntError| {
92 let message = format!("Invalid tcp_keepalive. {err}");
93 Error::Configuration(message.into())
94 })?;
95 options = options.tcp_keepalive(secs);
96 }
97 "page_size" => {
98 let page_size = value.parse().map_err(|err: ParseIntError| {
99 let message = format!("Invalid page_size. {err}");
100 Error::Configuration(message.into())
101 })?;
102 options = options.page_size(page_size);
103 }
104 "tls_rootcert" => {
105 options = options.tls_rootcert(&value);
106 }
107 "tls_cert" => {
108 options = options.tls_cert(&value);
109 }
110 "tls_key" => {
111 options = options.tls_key(&value);
112 }
113 _ => eprintln!("Not supported options. {key}"),
114 }
115 }
116
117 Ok(options)
118 }
119}
120
121impl ScyllaDBConnectOptions {
122 pub fn new() -> Self {
124 Self {
125 host: String::from("localhost"),
126 port: DEFAULT_PORT,
127 nodes: vec![],
128 keyspace: None,
129 username: None,
130 password: None,
131 replication_strategy: None,
132 replication_factor: 1,
133 compression: None,
134 tls_rootcert: None,
135 tls_cert: None,
136 tls_key: None,
137 tcp_nodelay: false,
138 tcp_keepalive: None,
139 page_size: DEFAULT_PAGE_SIZE,
140 statement_cache_capacity: DEFAULT_STATEMENT_CACHE_CAPACITY,
141 log_settings: Default::default(),
142 }
143 }
144
145 pub(crate) fn get_connect_nodes(&self) -> Vec<String> {
146 let mut nodes = Vec::with_capacity(self.nodes.len() + 1);
147 nodes.push(format!("{}:{}", self.host, self.port));
149 nodes.extend_from_slice(&self.nodes);
150 nodes
151 }
152
153 pub fn host(mut self, host: &str) -> Self {
155 host.clone_into(&mut self.host);
156 self
157 }
158
159 pub fn port(mut self, port: u16) -> Self {
161 self.port = port;
162 self
163 }
164
165 pub fn add_node(mut self, node: impl Into<String>) -> Self {
167 self.nodes.push(node.into());
168 self
169 }
170
171 pub fn keyspace(mut self, keyspace: impl Into<String>) -> Self {
173 self.keyspace = Some(keyspace.into());
174 self
175 }
176
177 pub fn username(mut self, username: &str) -> Self {
179 self.username = Some(username.to_string());
180 self
181 }
182
183 pub fn password(mut self, password: &str) -> Self {
185 self.password = Some(password.to_string());
186 self
187 }
188
189 pub fn replication_strategy(mut self, strategy: ScyllaDBReplicationStrategy) -> Self {
191 self.replication_strategy = Some(strategy);
192 self
193 }
194
195 pub fn replication_factor(mut self, factor: usize) -> Self {
197 self.replication_factor = factor;
198 self
199 }
200
201 pub fn compresson(mut self, compression: ScyllaDBCompression) -> Self {
203 self.compression = Some(compression);
204 self
205 }
206
207 pub fn tls_rootcert(mut self, root_cert: &str) -> Self {
209 self.tls_rootcert = Some(root_cert.to_string());
210 self
211 }
212
213 pub fn tls_cert(mut self, cert: &str) -> Self {
215 self.tls_cert = Some(cert.to_string());
216 self
217 }
218
219 pub fn tls_key(mut self, key: &str) -> Self {
221 self.tls_key = Some(key.to_string());
222 self
223 }
224
225 pub fn tcp_nodelay(mut self) -> Self {
227 self.tcp_nodelay = true;
228 self
229 }
230
231 pub fn tcp_keepalive(mut self, secs: u64) -> Self {
233 self.tcp_keepalive = Some(Duration::from_secs(secs));
234 self
235 }
236
237 pub fn page_size(mut self, page_size: i32) -> Self {
239 self.page_size = page_size;
240 self
241 }
242}
243
244impl ConnectOptions for ScyllaDBConnectOptions {
245 type Connection = ScyllaDBConnection;
246
247 fn from_url(url: &Url) -> Result<Self, Error> {
248 Self::parse_from_url(url)
249 }
250
251 fn to_url_lossy(&self) -> Url {
252 let mut url = Url::from_str(&format!("scylladb://{}:{}/", self.host, self.port))
253 .expect("BUG: generated un-parseable URL");
254
255 if let Some(keyspace) = &self.keyspace {
256 let _ = url.set_path(keyspace);
257 }
258
259 if let Some(username) = &self.username {
260 let _ = url.set_username(&username);
261 }
262
263 let _ = url.set_password(self.password.as_deref());
264
265 if self.nodes.len() > 0 {
266 let nodes = self.nodes.join(",");
267 url.query_pairs_mut().append_pair("nodes", &nodes);
268 }
269
270 if let Some(replication_strategy) = self.replication_strategy {
271 url.query_pairs_mut()
272 .append_pair("replication_strategy", &replication_strategy.to_string())
273 .append_pair("replication_factor", &self.replication_factor.to_string());
274 }
275
276 if let Some(compression) = self.compression {
277 url.query_pairs_mut()
278 .append_pair("compression", &compression.to_string());
279 }
280
281 if self.tcp_nodelay {
282 url.query_pairs_mut().append_key_only("tcp_nodelay");
283 }
284
285 if let Some(tcp_keepalive) = self.tcp_keepalive {
286 url.query_pairs_mut()
287 .append_pair("tcp_keepalive", &tcp_keepalive.as_secs().to_string());
288 }
289
290 url.query_pairs_mut()
291 .append_pair("page_size", &self.page_size.to_string());
292
293 if let Some(tls_rootcert) = &self.tls_rootcert {
294 url.query_pairs_mut()
295 .append_pair("tls_rootcert", &tls_rootcert);
296 }
297
298 if let Some(tls_cert) = &self.tls_cert {
299 url.query_pairs_mut().append_pair("tls_cert", &tls_cert);
300 }
301
302 if let Some(tls_key) = &self.tls_key {
303 url.query_pairs_mut().append_pair("tls_key", &tls_key);
304 }
305
306 url
307 }
308
309 fn connect(&self) -> BoxFuture<'_, Result<Self::Connection, Error>>
310 where
311 Self::Connection: Sized,
312 {
313 Box::pin(async { ScyllaDBConnection::establish(self).await })
314 }
315
316 fn log_statements(mut self, level: LevelFilter) -> Self {
317 self.log_settings.log_statements(level);
318 self
319 }
320
321 fn log_slow_statements(mut self, level: LevelFilter, duration: Duration) -> Self {
322 self.log_settings.log_slow_statements(level, duration);
323 self
324 }
325}
326
327impl FromStr for ScyllaDBConnectOptions {
328 type Err = Error;
329
330 fn from_str(s: &str) -> Result<Self, Self::Err> {
331 let url: Url = s.parse().map_err(Error::config)?;
332 Self::from_url(&url)
333 }
334}
335
336#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
338pub enum ScyllaDBReplicationStrategy {
339 #[default]
341 SimpleStrategy,
342 NetworkTopologyStrategy,
344}
345
346impl FromStr for ScyllaDBReplicationStrategy {
347 type Err = ScyllaDBError;
348
349 fn from_str(s: &str) -> Result<Self, Self::Err> {
350 let class = match s {
351 "simple" => Self::SimpleStrategy,
352 "network_topology" => Self::NetworkTopologyStrategy,
353 "SimpleStrategy" => Self::SimpleStrategy,
354 "NetworkTopologyStrategy" => Self::NetworkTopologyStrategy,
355 _ => {
356 return Err(ScyllaDBError::ConfigurationError(format!(
357 "replication_strategy '{s}' is invalid."
358 )));
359 }
360 };
361
362 Ok(class)
363 }
364}
365
366impl Display for ScyllaDBReplicationStrategy {
367 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
368 match self {
369 ScyllaDBReplicationStrategy::SimpleStrategy => write!(f, "SimpleStrategy"),
370 ScyllaDBReplicationStrategy::NetworkTopologyStrategy => {
371 write!(f, "NetworkTopologyStrategy")
372 }
373 }
374 }
375}
376
377#[derive(Debug, Clone, Copy, PartialEq, Eq)]
379pub enum ScyllaDBCompression {
380 LZ4Compressor,
382 SnappyCompressor,
384}
385
386impl Into<Compression> for ScyllaDBCompression {
387 fn into(self) -> Compression {
388 match self {
389 ScyllaDBCompression::LZ4Compressor => Compression::Lz4,
390 ScyllaDBCompression::SnappyCompressor => Compression::Snappy,
391 }
392 }
393}
394
395impl FromStr for ScyllaDBCompression {
396 type Err = ScyllaDBError;
397
398 fn from_str(s: &str) -> Result<Self, Self::Err> {
399 let compressor = match s.to_ascii_lowercase().as_str() {
400 "lz4" => Self::LZ4Compressor,
401 "snappy" => Self::SnappyCompressor,
402 _ => {
403 return Err(ScyllaDBError::ConfigurationError(format!(
404 "compressor '{s}' is invalid."
405 )));
406 }
407 };
408
409 Ok(compressor)
410 }
411}
412
413impl Display for ScyllaDBCompression {
414 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
415 match self {
416 ScyllaDBCompression::LZ4Compressor => write!(f, "lz4"),
417 ScyllaDBCompression::SnappyCompressor => write!(f, "snappy"),
418 }
419 }
420}
421
422impl TryInto<TlsContext> for &ScyllaDBConnectOptions {
423 type Error = sqlx::Error;
424
425 fn try_into(self) -> Result<TlsContext, Self::Error> {
426 #[cfg(feature = "openssl-010")]
427 {
428 let ssl_context: openssl_010::ssl::SslContext = self.try_into()?;
429 return Ok(TlsContext::OpenSsl010(ssl_context));
430 }
431
432 #[allow(unreachable_code)]
433 #[cfg(feature = "rustls-023")]
434 {
435 let client_config: rustls_023::ClientConfig = self.try_into()?;
436 return Ok(TlsContext::Rustls023(std::sync::Arc::new(client_config)));
437 }
438
439 #[allow(unreachable_code)]
440 Err(Error::Configuration(
441 "To enable TLS, specify the ‘openssl-010’ or ‘rustls-023’ feature.".into(),
442 ))
443 }
444}
445
446#[cfg(feature = "openssl-010")]
447impl TryInto<openssl_010::ssl::SslContext> for &ScyllaDBConnectOptions {
448 type Error = sqlx::Error;
449
450 fn try_into(self) -> Result<openssl_010::ssl::SslContext, Self::Error> {
451 use std::{fs, fs::File, io::Read, path::PathBuf};
452
453 use openssl_010::{
454 pkey::PKey,
455 ssl::{SslContextBuilder, SslMethod, SslVerifyMode},
456 x509::{X509, store::X509StoreBuilder},
457 };
458
459 let mut context_builder = SslContextBuilder::new(SslMethod::tls())
460 .map_err(|e| Error::Configuration(Box::new(e)))?;
461
462 if let Some(root_cert) = &self.tls_rootcert {
463 let ca_path = fs::canonicalize(PathBuf::from(root_cert))
464 .map_err(|e| Error::Configuration(Box::new(e)))?;
465 let mut ca_file = File::open(ca_path).map_err(|e| Error::Configuration(Box::new(e)))?;
466 let mut ca_buf = Vec::new();
467 ca_file
468 .read_to_end(&mut ca_buf)
469 .map_err(|e| Error::Configuration(Box::new(e)))?;
470 let ca_x509 = X509::from_pem(&ca_buf).map_err(|e| Error::Configuration(Box::new(e)))?;
471
472 let mut builder =
473 X509StoreBuilder::new().map_err(|e| Error::Configuration(Box::new(e)))?;
474 builder
475 .add_cert(ca_x509)
476 .map_err(|e| Error::Configuration(Box::new(e)))?;
477 let cert_store = builder.build();
478
479 context_builder.set_cert_store(cert_store);
480 }
481
482 if let Some(cert) = &self.tls_cert {
483 if let Some(key) = &self.tls_key {
484 let cert_path = fs::canonicalize(PathBuf::from(cert))
485 .map_err(|e| Error::Configuration(Box::new(e)))?;
486 let mut cert_file =
487 File::open(cert_path).map_err(|e| Error::Configuration(Box::new(e)))?;
488 let mut cert_buf = Vec::new();
489 cert_file
490 .read_to_end(&mut cert_buf)
491 .map_err(|e| Error::Configuration(Box::new(e)))?;
492 let cert_x509 =
493 X509::from_pem(&cert_buf).map_err(|e| Error::Configuration(Box::new(e)))?;
494 context_builder
495 .set_certificate(&cert_x509)
496 .map_err(|e| Error::Configuration(Box::new(e)))?;
497
498 let key_path = fs::canonicalize(PathBuf::from(key))
499 .map_err(|e| Error::Configuration(Box::new(e)))?;
500 let mut key_file =
501 File::open(key_path).map_err(|e| Error::Configuration(Box::new(e)))?;
502 let mut key_buf = Vec::new();
503 key_file
504 .read_to_end(&mut key_buf)
505 .map_err(|e| Error::Configuration(Box::new(e)))?;
506 let pkey = PKey::private_key_from_pem(&key_buf)
507 .map_err(|e| Error::Configuration(Box::new(e)))?;
508 context_builder
509 .set_private_key(&pkey)
510 .map_err(|e| Error::Configuration(Box::new(e)))?;
511
512 context_builder.set_verify(SslVerifyMode::PEER);
513 } else {
514 return Err(Error::Configuration(
515 "Client private key is required.".into(),
516 ));
517 }
518 } else {
519 context_builder.set_verify(SslVerifyMode::NONE);
520 }
521
522 let context = context_builder.build();
523
524 Ok(context)
525 }
526}
527
528#[cfg(feature = "rustls-023")]
529impl TryInto<rustls_023::ClientConfig> for &ScyllaDBConnectOptions {
530 type Error = sqlx::Error;
531
532 fn try_into(self) -> Result<rustls_023::ClientConfig, Self::Error> {
533 use rustls_023::{
534 ClientConfig, RootCertStore,
535 pki_types::{CertificateDer, PrivateKeyDer, pem::PemObject},
536 };
537
538 let builder = ClientConfig::builder();
539
540 let builder = if let Some(root_cert) = &self.tls_rootcert {
541 let rustls_ca = CertificateDer::from_pem_file(root_cert)
542 .map_err(|e| Error::Configuration(Box::new(e)))?;
543 let mut root_store = RootCertStore::empty();
544 root_store
545 .add(rustls_ca)
546 .map_err(|e| Error::Configuration(Box::new(e)))?;
547 builder.with_root_certificates(root_store)
548 } else {
549 return Err(Error::Configuration(
550 "Root certification file is required.".into(),
551 ));
552 };
553
554 let client_config = if let Some(cert) = &self.tls_cert {
555 if let Some(key) = &self.tls_key {
556 let client_cert = CertificateDer::from_pem_file(cert)
557 .map_err(|e| Error::Configuration(Box::new(e)))?;
558 let priv_key = PrivateKeyDer::from_pem_file(key)
559 .map_err(|e| Error::Configuration(Box::new(e)))?;
560
561 builder
562 .with_client_auth_cert(vec![client_cert], priv_key)
563 .map_err(|e| Error::Configuration(Box::new(e)))?
564 } else {
565 return Err(Error::Configuration(
566 "Client private key is required.".into(),
567 ));
568 }
569 } else {
570 builder.with_no_client_auth()
571 };
572
573 Ok(client_config)
574 }
575}
576
577#[cfg(test)]
578mod tests {
579 use std::{str::FromStr, time::Duration};
580
581 use claims::{assert_none, assert_some_eq};
582 use sqlx::ConnectOptions;
583
584 use crate::{
585 ScyllaDBConnectOptions,
586 options::{ScyllaDBCompression, ScyllaDBReplicationStrategy},
587 };
588
589 #[test]
590 fn it_can_parse_url() -> anyhow::Result<()> {
591 const URL: &'static str = "scylladb://my_name:my_passwd@localhost/my_keyspace?nodes=example.test,example2.test:9043&tcp_nodelay&tcp_keepalive=40&compression=lz4&replication_strategy=simple&replication_factor=2&page_size=10&tls_rootcert=/etc/tls/root.pem&tls_cert=/etc/tls/client.pem&tls_key=/etc/tls/client.key";
592 let options: ScyllaDBConnectOptions = URL.parse()?;
593
594 assert_some_eq!(options.keyspace, "my_keyspace");
595 assert!(options.tcp_nodelay);
596 assert_some_eq!(options.tcp_keepalive, Duration::from_secs(40));
597
598 assert_some_eq!(options.username, "my_name");
599 assert_some_eq!(options.password, "my_passwd");
600
601 assert_eq!(
602 vec!["localhost:9042", "example.test", "example2.test:9043"],
603 options.nodes
604 );
605
606 assert_some_eq!(options.compression, ScyllaDBCompression::LZ4Compressor);
607
608 assert_some_eq!(
609 options.replication_strategy,
610 ScyllaDBReplicationStrategy::SimpleStrategy
611 );
612 assert_eq!(options.replication_factor, 2);
613
614 let page_size = options.page_size;
615 assert_eq!(10, page_size);
616
617 assert_some_eq!(options.tls_rootcert, "/etc/tls/root.pem");
618 assert_some_eq!(options.tls_cert, "/etc/tls/client.pem");
619 assert_some_eq!(options.tls_key, "/etc/tls/client.key");
620
621 Ok(())
622 }
623
624 #[test]
625 fn it_can_convert_options_to_url() -> anyhow::Result<()> {
626 const URL: &'static str = "scylladb://my_name:my_passwd@localhost/my_keyspace?nodes=example.test,example2.test:9043&tcp_nodelay&tcp_keepalive=40&compression=lz4&replication_strategy=simple&replication_factor=2&page_size=10&tls_rootcert=/etc/tls/root.pem&tls_cert=/etc/tls/client.pem&tls_key=/etc/tls/client.key";
627 let options: ScyllaDBConnectOptions = URL.parse()?;
628
629 let url = options.to_url_lossy();
630
631 assert_eq!(
632 url.to_string(),
633 "scylladb://my_name:my_passwd@localhost:9042/my_keyspace?nodes=example.test%2Cexample2.test%3A9043&replication_strategy=SimpleStrategy&replication_factor=2&compression=lz4&tcp_nodelay&tcp_keepalive=40&page_size=10&tls_rootcert=%2Fetc%2Ftls%2Froot.pem&tls_cert=%2Fetc%2Ftls%2Fclient.pem&tls_key=%2Fetc%2Ftls%2Fclient.key"
634 );
635
636 Ok(())
637 }
638
639 #[test]
640 fn it_can_add_node() -> anyhow::Result<()> {
641 let options = ScyllaDBConnectOptions::new();
642
643 assert_eq!(options.nodes.len(), 0);
644
645 let options = options.add_node("example1.test:9043");
646
647 assert_eq!(options.nodes, vec!["example1.test:9043"]);
648
649 Ok(())
650 }
651
652 #[test]
653 fn it_can_set_keyspace() -> anyhow::Result<()> {
654 let options = ScyllaDBConnectOptions::new();
655
656 assert_none!(&options.keyspace);
657
658 let options = options.keyspace("test");
659
660 assert_some_eq!(options.keyspace, "test");
661
662 Ok(())
663 }
664
665 #[test]
666 fn it_can_set_username() -> anyhow::Result<()> {
667 let options = ScyllaDBConnectOptions::new();
668
669 assert_none!(&options.username);
670
671 let options = options.username("my_name");
672
673 assert_some_eq!(options.username, "my_name");
674
675 Ok(())
676 }
677
678 #[test]
679 fn it_can_set_password() -> anyhow::Result<()> {
680 let options = ScyllaDBConnectOptions::new();
681
682 assert_none!(&options.password);
683
684 let options = options.password("my_password");
685
686 assert_some_eq!(options.password, "my_password");
687
688 Ok(())
689 }
690
691 #[test]
692 fn it_can_set_replication_strategy() -> anyhow::Result<()> {
693 let options = ScyllaDBConnectOptions::new();
694
695 assert_none!(options.replication_strategy);
696
697 let options =
698 options.replication_strategy(ScyllaDBReplicationStrategy::NetworkTopologyStrategy);
699
700 assert_some_eq!(
701 options.replication_strategy,
702 ScyllaDBReplicationStrategy::NetworkTopologyStrategy,
703 );
704 assert_eq!(options.replication_factor, 1);
705
706 Ok(())
707 }
708
709 #[test]
710 fn it_can_parse_replication_strategy_from_str() -> anyhow::Result<()> {
711 assert_eq!(
712 ScyllaDBReplicationStrategy::SimpleStrategy,
713 ScyllaDBReplicationStrategy::from_str("simple")?
714 );
715
716 assert_eq!(
717 ScyllaDBReplicationStrategy::SimpleStrategy,
718 ScyllaDBReplicationStrategy::from_str("SimpleStrategy")?
719 );
720
721 assert_eq!(
722 ScyllaDBReplicationStrategy::NetworkTopologyStrategy,
723 ScyllaDBReplicationStrategy::from_str("network_topology")?
724 );
725
726 assert_eq!(
727 ScyllaDBReplicationStrategy::NetworkTopologyStrategy,
728 ScyllaDBReplicationStrategy::from_str("NetworkTopologyStrategy")?
729 );
730
731 Ok(())
732 }
733
734 #[test]
735 fn it_can_set_replication_factor() -> anyhow::Result<()> {
736 let options = ScyllaDBConnectOptions::new();
737
738 assert_eq!(options.replication_factor, 1);
739
740 let options = options.replication_factor(2);
741
742 assert_eq!(options.replication_factor, 2);
743
744 Ok(())
745 }
746
747 #[test]
748 fn it_can_set_compression() -> anyhow::Result<()> {
749 let options = ScyllaDBConnectOptions::new();
750
751 assert_none!(options.compression);
752
753 let options = options.compresson(ScyllaDBCompression::SnappyCompressor);
754
755 assert_some_eq!(options.compression, ScyllaDBCompression::SnappyCompressor);
756
757 Ok(())
758 }
759
760 #[test]
761 fn it_can_set_tcp_nodelay() -> anyhow::Result<()> {
762 let options = ScyllaDBConnectOptions::new();
763
764 assert!(!options.tcp_nodelay);
765
766 let options = options.tcp_nodelay();
767
768 assert!(options.tcp_nodelay);
769
770 Ok(())
771 }
772
773 #[test]
774 fn it_can_set_tcp_keepalive() -> anyhow::Result<()> {
775 let options = ScyllaDBConnectOptions::new();
776
777 assert!(options.tcp_keepalive.is_none());
778
779 let options = options.tcp_keepalive(20);
780
781 assert_some_eq!(options.tcp_keepalive, Duration::from_secs(20));
782
783 Ok(())
784 }
785
786 #[test]
787 fn it_can_set_page_size() -> anyhow::Result<()> {
788 let options = ScyllaDBConnectOptions::new();
789
790 assert_eq!(5000, options.page_size);
791
792 let options = options.page_size(200);
793
794 assert_eq!(200, options.page_size);
795
796 Ok(())
797 }
798}