use std::net::IpAddr;
use cdrs_tokio::types::rows::Row as CdrsRow;
use cdrs_tokio::types::{ByName, IntoRustByName};
use uuid::Uuid;
use crate::error::{CassandraError, CassandraResult};
use crate::pool::CassandraPool;
fn col_or_default<T>(row: &CdrsRow, name: &str) -> CassandraResult<T>
where
T: Default,
CdrsRow: IntoRustByName<T>,
{
Ok(ByName::by_name::<T>(row, name)
.map_err(|e| CassandraError::Query(e.to_string()))?
.unwrap_or_default())
}
pub struct VirtualTables<'a> {
pool: &'a CassandraPool,
}
impl<'a> VirtualTables<'a> {
pub fn new(pool: &'a CassandraPool) -> Self {
Self { pool }
}
pub async fn cluster_info(&self) -> CassandraResult<ClusterInfo> {
let result = self
.pool
.query("SELECT cluster_name, partitioner, release_version FROM system.local")
.await?;
let row = result
.rows
.first()
.map(|r| r.as_cdrs())
.ok_or_else(|| CassandraError::Query("system.local returned no row".into()))?;
Ok(ClusterInfo {
cluster_name: col_or_default(row, "cluster_name")?,
partitioner: col_or_default(row, "partitioner")?,
release_version: col_or_default(row, "release_version")?,
})
}
pub async fn peers(&self) -> CassandraResult<Vec<PeerInfo>> {
let result = match self
.pool
.query("SELECT peer, data_center, host_id, rack, release_version FROM system.peers_v2")
.await
{
Ok(r) => r,
Err(_) => self
.pool
.query("SELECT peer, data_center, host_id, rack, release_version FROM system.peers")
.await?,
};
result
.rows
.iter()
.map(|r| r.as_cdrs())
.map(|row| {
let peer = ByName::by_name::<IpAddr>(row, "peer")
.map_err(|e| CassandraError::Query(e.to_string()))?
.ok_or_else(|| CassandraError::Query("peer column was null".into()))?;
Ok(PeerInfo {
peer,
data_center: col_or_default(row, "data_center")?,
host_id: col_or_default::<Uuid>(row, "host_id")?,
rack: col_or_default(row, "rack")?,
release_version: col_or_default(row, "release_version")?,
})
})
.collect()
}
pub async fn settings(&self) -> CassandraResult<Vec<(String, String)>> {
let result = self
.pool
.query("SELECT name, value FROM system_views.settings")
.await?;
result
.rows
.iter()
.map(|r| r.as_cdrs())
.map(|row| {
Ok((
col_or_default::<String>(row, "name")?,
col_or_default::<String>(row, "value")?,
))
})
.collect()
}
}
#[derive(Debug, Clone)]
pub struct ClusterInfo {
pub cluster_name: String,
pub partitioner: String,
pub release_version: String,
}
#[derive(Debug, Clone)]
pub struct PeerInfo {
pub peer: IpAddr,
pub data_center: String,
pub host_id: Uuid,
pub rack: String,
pub release_version: String,
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn test_cluster_info_debug() {
let ci = ClusterInfo {
cluster_name: "Test Cluster".into(),
partitioner: "Murmur3Partitioner".into(),
release_version: "4.1.0".into(),
};
let dbg = format!("{:?}", ci);
assert!(dbg.contains("Test Cluster"));
}
#[test]
fn test_peer_info_construction() {
let pi = PeerInfo {
peer: IpAddr::from_str("192.168.1.1").unwrap(),
data_center: "dc1".into(),
host_id: Uuid::nil(),
rack: "rack1".into(),
release_version: "4.1.0".into(),
};
assert_eq!(pi.data_center, "dc1");
}
}