extern crate serde;
extern crate serde_xml_rs;
use std::collections::{BTreeMap, HashMap};
use std::fmt;
use std::fs::File;
use std::io::{BufRead, BufReader, Cursor};
use std::net::IpAddr;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use super::{
process_output, resolve_to_ip, run_command, translate_to_bytes, BitrotOption, BrickStatus,
GlusterError, GlusterOption, Quota,
};
use byteorder::{BigEndian, ReadBytesExt};
use peer::{get_peer, Peer, State};
use regex::Regex;
use rpc;
use rpc::{Pack, UnPack};
use unix_socket::UnixStream;
use uuid::Uuid;
#[derive(Clone, Eq, PartialEq)]
pub struct Brick {
pub peer: Peer,
pub path: PathBuf,
}
impl Brick {
pub fn to_string(&self) -> String {
format!(
"{}:{}",
self.peer.hostname.clone(),
self.path.to_string_lossy()
)
}
}
impl fmt::Debug for Brick {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}:{:?}", self.peer.hostname, self.path.to_str())
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum Transport {
Tcp,
Rdma,
TcpAndRdma,
}
impl Transport {
fn new(name: &str) -> Transport {
match name.trim().to_ascii_lowercase().as_ref() {
"tcp" => Transport::Tcp,
"tcp,rdma" => Transport::TcpAndRdma,
"rdma" => Transport::Rdma,
_ => Transport::Tcp,
}
}
fn to_string(&self) -> String {
match *self {
Transport::Rdma => "rdma".to_string(),
Transport::Tcp => "tcp".to_string(),
Transport::TcpAndRdma => "tcp,rdma".to_string(),
}
}
}
impl FromStr for Transport {
type Err = GlusterError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"0" => Ok(Transport::Tcp),
"1" => Ok(Transport::Rdma),
"2" => Ok(Transport::TcpAndRdma),
"tcp" => Ok(Transport::Tcp),
"tcp,rdma" => Ok(Transport::TcpAndRdma),
"rdma" => Ok(Transport::Rdma),
_ => Err(GlusterError::new(format!(
"Unknown transport string: {}",
s
))),
}
}
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub enum VolumeTranslator {
Arbiter,
Disperse,
Replica,
Redundancy,
Stripe,
}
impl VolumeTranslator {
fn to_string(&self) -> String {
match *self {
VolumeTranslator::Arbiter => "arbiter".to_string(),
VolumeTranslator::Disperse => "disperse".to_string(),
VolumeTranslator::Replica => "replica".to_string(),
VolumeTranslator::Redundancy => "redundancy".to_string(),
VolumeTranslator::Stripe => "stripe".to_string(),
}
}
}
#[derive(Debug, Deserialize, Eq, PartialEq)]
pub enum VolumeType {
#[serde(rename = "Arbiter")]
Arbiter,
#[serde(rename = "Distribute")]
Distribute,
#[serde(rename = "Stripe")]
Stripe,
#[serde(rename = "Replicate")]
Replicate,
#[serde(rename = "Striped-Replicate")]
StripedAndReplicate,
#[serde(rename = "Disperse")]
Disperse,
#[serde(rename = "Distributed-Stripe")]
DistributedAndStripe,
#[serde(rename = "Distributed-Replicate")]
DistributedAndReplicate,
#[serde(rename = "Distributed-Striped-Replicate")]
DistributedAndStripedAndReplicate,
#[serde(rename = "Distributed-Disperse")]
DistributedAndDisperse,
}
impl VolumeType {
pub fn new(name: &str) -> VolumeType {
match name.trim().to_ascii_lowercase().as_ref() {
"arbiter" => VolumeType::Arbiter,
"distribute" => VolumeType::Distribute,
"stripe" => VolumeType::Stripe,
"replicate" => VolumeType::Replicate,
"striped-replicate" => VolumeType::StripedAndReplicate,
"disperse" => VolumeType::Disperse,
"distributed-stripe" => VolumeType::DistributedAndStripe,
"distributed-replicate" => VolumeType::DistributedAndReplicate,
"distributed-striped-replicate" => VolumeType::DistributedAndStripedAndReplicate,
"distributed-disperse" => VolumeType::DistributedAndDisperse,
_ => VolumeType::Replicate,
}
}
pub fn from_str(vol_type: &str) -> VolumeType {
match vol_type {
"Arbiter" => VolumeType::Arbiter,
"Distribute" => VolumeType::Distribute,
"Stripe" => VolumeType::Stripe,
"Replicate" => VolumeType::Replicate,
"Striped-Replicate" => VolumeType::StripedAndReplicate,
"Disperse" => VolumeType::Disperse,
"Distributed-Stripe" => VolumeType::DistributedAndStripe,
"Distributed-Replicate" => VolumeType::DistributedAndReplicate,
"Distributed-Striped-Replicate" => VolumeType::DistributedAndStripedAndReplicate,
"Distributed-Disperse" => VolumeType::DistributedAndDisperse,
_ => VolumeType::Replicate,
}
}
pub fn to_string(&self) -> String {
match *self {
VolumeType::Arbiter => "Replicate".to_string(),
VolumeType::Distribute => "Distribute".to_string(),
VolumeType::Stripe => "Stripe".to_string(),
VolumeType::Replicate => "Replicate".to_string(),
VolumeType::StripedAndReplicate => "Striped-Replicate".to_string(),
VolumeType::Disperse => "Disperse".to_string(),
VolumeType::DistributedAndStripe => "Distributed-Stripe".to_string(),
VolumeType::DistributedAndReplicate => "Distributed-Replicate".to_string(),
VolumeType::DistributedAndStripedAndReplicate => {
"Distributed-Striped-Replicate".to_string()
}
VolumeType::DistributedAndDisperse => "Distributed-Disperse".to_string(),
}
}
}
#[derive(Debug, Eq, PartialEq)]
pub struct Volume {
pub name: String,
pub vol_type: VolumeType,
pub id: Uuid,
pub status: String,
pub transport: Transport,
pub bricks: Vec<Brick>,
pub options: BTreeMap<String, String>,
}
#[derive(Debug, Deserialize, Eq, PartialEq)]
pub struct BrickXml {
pub name: String,
#[serde(rename = "hostUuid")]
pub host_uuid: Uuid,
#[serde(rename = "isArbiter")]
pub is_arbiter: String,
}
#[derive(Debug, Deserialize, Eq, PartialEq)]
pub struct VolumeCliXml {
#[serde(rename = "opRet")]
pub ret: i32,
#[serde(rename = "opErrno")]
pub errno: i32,
#[serde(rename = "opErrStr")]
pub err_str: Option<String>,
#[serde(rename = "volInfo")]
pub volumes: XmlVolumes,
}
#[derive(Debug, Deserialize, Eq, PartialEq)]
pub struct XmlVolumes {
pub volumes: Blah,
}
#[derive(Debug, Deserialize, Eq, PartialEq)]
pub struct Blah {
pub volume: Vec<VolumeXml>,
pub count: u64,
}
#[derive(Debug, Deserialize, Eq, PartialEq)]
pub struct VolumeXml {
pub name: String,
pub id: Uuid,
pub status: String,
#[serde(rename = "statusStr")]
pub status_str: String,
#[serde(rename = "snapshotCount")]
pub snapshot_count: String,
#[serde(rename = "brickCount")]
pub brick_count: String,
#[serde(rename = "distCount")]
pub dist_count: String,
#[serde(rename = "stripeCount")]
pub stripe_count: String,
#[serde(rename = "replicaCount")]
pub replica_count: String,
#[serde(rename = "arbiterCount")]
pub arbiter_count: String,
#[serde(rename = "disperseCount")]
pub disperse_count: String,
#[serde(rename = "redundancyCount")]
redundancy_count: String,
#[serde(rename = "type")]
pub vol_type: String,
#[serde(rename = "typeStr")]
pub type_str: VolumeType,
pub transport: String,
pub xlators: Option<String>,
pub bricks: Vec<String>,
#[serde(rename = "optCount")]
pub option_count: String,
}
#[test]
fn test_parse_volume_info_xml() {
use std::fs::File;
use std::io::Read;
}
enum ParseState {
Root,
Bricks,
Options,
}
pub fn volume_list() -> Option<Vec<String>> {
let mut arg_list: Vec<String> = Vec::new();
arg_list.push("volume".to_string());
arg_list.push("list".to_string());
let output = run_command("gluster", &arg_list, true, false);
let status = output.status;
if !status.success() {
debug!("Volume list get command failed");
return None;
}
let output_str: String = match String::from_utf8(output.stdout) {
Ok(n) => n,
Err(_) => {
debug!("Volume list output transformation to utf8 failed");
return None;
}
};
let mut volume_names: Vec<String> = Vec::new();
for line in output_str.lines() {
if line.is_empty() {
continue;
}
volume_names.push(line.trim().to_string());
}
Some(volume_names)
}
#[test]
fn test_parse_volume_info2() {
let _test_data = r#"
type=2
count=9
status=1
sub_count=3
stripe_count=1
replica_count=3
disperse_count=0
redundancy_count=0
version=9
transport-type=0
volume-id=e7d940ba-8b7c-4e37-a664-2975bc8452fc
username=b2aa4fe8-5c35-4bc0-8666-a8964e6ec884
password=4f377e97-554e-4bd5-8c58-34c3b9074db8
op-version=31000
client-op-version=30712
quota-version=1
tier-enabled=0
parent_volname=N/A
restored_from_snap=00000000-0000-0000-0000-000000000000
snap-max-hard-limit=256
performance.client-io-threads=on
nfs.disable=on
transport.address-family=inet
server.allow-insecure=on
features.quota=on
features.inode-quota=on
features.quota-deem-statfs=on
performance.readdir-ahead=on
performance.parallel-readdir=on
cluster.favorite-child-policy=mtime
features.bitrot=on
features.scrub=Active
brick-0=10.0.2.81:-mnt-sdc-brick
brick-1=10.0.2.82:-mnt-sdc-brick
brick-2=10.0.2.83:-mnt-sdc-brick
brick-3=10.0.2.81:-mnt-sdd-brick
brick-4=10.0.2.82:-mnt-sdd-brick
brick-5=10.0.2.83:-mnt-sdd-brick
brick-6=10.0.2.81:-mnt-sde-brick
brick-7=10.0.2.82:-mnt-sde-brick
brick-8=10.0.2.83:-mnt-sde-brick
"#;
}
#[test]
fn test_parse_volume_info() {
let test_data = r#"
Volume Name: test
Type: Replicate
Volume ID: cae6868d-b080-4ea3-927b-93b5f1e3fe69
Status: Started
Number of Bricks: 1 x 2 = 2
Transport-type: tcp
Bricks:
Brick1: 172.31.41.135:/mnt/xvdf
Options Reconfigured:
features.inode-quota: off
features.quota: off
transport.address-family: inet
performance.readdir-ahead: on
nfs.disable: on
"#;
let result = parse_volume_info("test", test_data).unwrap();
let mut options_map: BTreeMap<String, String> = BTreeMap::new();
options_map.insert("features.inode-quota".to_string(), "off".to_string());
options_map.insert("features.quota".to_string(), "off".to_string());
options_map.insert("transport.address-family".to_string(), "inet".to_string());
options_map.insert("performance.readdir-ahead".to_string(), "on".to_string());
options_map.insert("nfs.disable".to_string(), "on".to_string());
let vol_info = Volume {
name: "test".to_string(),
vol_type: VolumeType::Replicate,
id: Uuid::parse_str("cae6868d-b080-4ea3-927b-93b5f1e3fe69").unwrap(),
status: "Started".to_string(),
transport: Transport::Tcp,
bricks: vec![Brick {
peer: Peer {
uuid: Uuid::parse_str("78f68270-201a-4d8a-bad3-7cded6e6b7d8").unwrap(),
hostname: "test_ip".to_string(),
status: State::Connected,
},
path: PathBuf::from("/mnt/xvdf"),
}],
options: options_map,
};
println!("vol_info: {:?}", vol_info);
assert_eq!(vol_info, result);
}
fn parse_volume_info2(volume: &str) -> Result<Volume, GlusterError> {
let mut p = PathBuf::from("/var/lib/glusterd/vols");
p.push(volume);
p.push("info");
let f = File::open(p)?;
let f = BufReader::new(f);
let name = String::new();
let status = String::new();
let _vol_type = String::new();
let mut transport = String::new();
let options: BTreeMap<String, String> = BTreeMap::new();
let bricks: Vec<Brick> = Vec::new();
for line in f.lines() {
let line = line?;
if line.starts_with("transport-type") {
transport = line.split('=').collect::<Vec<&str>>()[1].to_string();
}
}
Ok(Volume {
name,
vol_type: VolumeType::from_str(""),
id: Uuid::from_str("")?,
status,
transport: Transport::from_str(&transport)?,
bricks,
options,
})
}
fn parse_volume_info(volume: &str, output_str: &str) -> Result<Volume, GlusterError> {
let mut transport_type = String::new();
let mut volume_type = String::new();
let mut volume_name = String::new();
let mut volume_options: BTreeMap<String, String> = BTreeMap::new();
let mut status = String::new();
let mut bricks: Vec<Brick> = Vec::new();
let mut id = Uuid::nil();
if output_str.trim() == "No volumes present" {
debug!("No volumes present");
println!("No volumes present");
return Err(GlusterError::NoVolumesPresent);
}
if output_str.trim() == format!("Volume {} does not exist", volume) {
debug!("Volume {} does not exist", volume);
println!("Volume {} does not exist", volume);
return Err(GlusterError::new(format!(
"Volume: {} does not exist",
volume
)));
}
let mut parser_state = ParseState::Root;
for line in output_str.lines() {
if line.is_empty() {
continue;
}
match line {
"Bricks:" => {
parser_state = ParseState::Bricks;
continue;
}
"Options Reconfigured:" => {
parser_state = ParseState::Options;
continue;
}
_ => {}
};
match parser_state {
ParseState::Root => {
let parts: Vec<String> = line.split(": ").map(|e| e.to_string()).collect();
if parts.len() < 2 {
continue;
}
let name = &parts[0];
let value = &parts[1];
if name == "Volume Name" {
volume_name = value.to_owned();
}
if name == "Type" {
volume_type = value.to_owned();
}
if name == "Volume ID" {
id = Uuid::parse_str(&value)?;
}
if name == "Status" {
status = value.to_owned();
}
if name == "Transport-Type" {
transport_type = value.to_owned();
}
if name == "Number of Bricks" {}
}
ParseState::Bricks => {
let parts: Vec<String> = line.split(": ").map(|e| e.to_string()).collect();
if parts.len() < 2 {
continue;
}
let value = &parts[1];
let brick_parts: Vec<&str> = value.split(':').collect();
assert!(
brick_parts.len() == 2,
"Failed to parse bricks from gluster vol info"
);
let mut hostname = brick_parts[0].trim().to_string();
let check_for_ip = hostname.parse::<IpAddr>();
if check_for_ip.is_err() {
hostname = match resolve_to_ip(&hostname) {
Ok(ip_addr) => ip_addr,
Err(e) => {
return Err(GlusterError::new(format!(
"Failed to resolve hostname: \
{}. Error: {}",
&hostname, e
)));
}
};
}
let peer: Peer = get_peer(&hostname.to_string())?;
debug!("get_peer_by_ipaddr result: Peer: {:?}", peer);
let brick = Brick {
peer,
path: PathBuf::from(brick_parts[1].to_string()),
};
bricks.push(brick);
}
ParseState::Options => {
let parts: Vec<String> = line.split(": ").map(|e| e.to_string()).collect();
if parts.len() < 2 {
continue;
}
volume_options.insert(parts[0].clone(), parts[1].clone());
}
}
}
let transport = Transport::new(&transport_type);
let vol_type = VolumeType::new(&volume_type);
let vol_info = Volume {
name: volume_name,
vol_type,
id,
status,
transport,
bricks,
options: volume_options,
};
Ok(vol_info)
}
pub fn volume_info(volume: &str) -> Result<Volume, GlusterError> {
let mut arg_list: Vec<String> = Vec::new();
arg_list.push("volume".to_string());
arg_list.push("info".to_string());
arg_list.push(volume.to_string());
let output = run_command("gluster", &arg_list, true, false);
let status = output.status;
if !status.success() {
debug!("Volume info get command failed");
println!(
"Volume info get command failed with error: {}",
String::from_utf8_lossy(&output.stderr)
);
return Err(GlusterError::NoVolumesPresent);
}
let output_str: String = String::from_utf8(output.stdout)?;
parse_volume_info(&volume, &output_str)
}
pub fn get_quota_usage(volume: &str) -> Result<u64, GlusterError> {
let xid = 1;
let prog = rpc::GLUSTER_QUOTA_PROGRAM_NUMBER;
let vers = 1;
let verf = rpc::GlusterAuth {
flavor: rpc::AuthFlavor::AuthNull,
stuff: vec![0, 0, 0, 0],
};
let verf_bytes = verf.pack()?;
let creds = rpc::GlusterCred {
flavor: rpc::GLUSTER_V2_CRED_FLAVOR,
pid: 0,
uid: 0,
gid: 0,
groups: "".to_string(),
lock_owner: vec![0, 0, 0, 0],
};
let cred_bytes = creds.pack()?;
let mut call_bytes = rpc::pack_quota_callheader(
xid,
prog,
vers,
rpc::GlusterAggregatorCommand::GlusterAggregatorGetlimit,
cred_bytes,
verf_bytes,
)?;
let mut dict: HashMap<String, Vec<u8>> = HashMap::with_capacity(4);
let mut gfid = "00000000-0000-0000-0000-000000000001"
.to_string()
.into_bytes();
gfid.push(0);
let mut name = volume.to_string().into_bytes();
name.push(0);
let mut version = "1.20000005".to_string().into_bytes();
version.push(0);
let mut vol_type = "5".to_string().into_bytes();
vol_type.push(0);
dict.insert("gfid".to_string(), gfid);
dict.insert("type".to_string(), vol_type);
dict.insert("volume-uuid".to_string(), name);
dict.insert("version".to_string(), version);
let quota_request = rpc::GlusterCliRequest { dict };
let quota_bytes = quota_request.pack()?;
for byte in quota_bytes {
call_bytes.push(byte);
}
let addr = Path::new("/var/run/gluster/quotad.socket");
let mut sock = UnixStream::connect(&addr)?;
let _send_bytes = rpc::sendrecord(&mut sock, &call_bytes)?;
let mut reply_bytes = rpc::recvrecord(&mut sock)?;
let mut cursor = Cursor::new(&mut reply_bytes[..]);
rpc::unpack_replyheader(&mut cursor)?;
let mut cli_response = rpc::GlusterCliResponse::unpack(&mut cursor)?;
let quota_size_bytes = match cli_response.dict.get_mut("trusted.glusterfs.quota.size") {
Some(s) => s,
None => {
return Err(GlusterError::new(
"trusted.glusterfs.quota.size was not returned from \
quotad"
.to_string(),
));
}
};
let mut size_cursor = Cursor::new(&mut quota_size_bytes[..]);
let usage = size_cursor.read_u64::<BigEndian>()?;
Ok(usage)
}
pub fn quota_list(volume: &str) -> Result<Vec<Quota>, GlusterError> {
let mut args_list: Vec<String> = Vec::new();
args_list.push("volume".to_string());
args_list.push("quota".to_string());
args_list.push(volume.to_string());
args_list.push("list".to_string());
let output = run_command("gluster", &args_list, true, false);
let status = output.status;
if !status.success() {
debug!(
"Volume quota list command failed with error: {}",
String::from_utf8_lossy(&output.stderr)
);
return Err(GlusterError::new(
String::from_utf8_lossy(&output.stderr).into_owned(),
));
}
let output_str: String = String::from_utf8(output.stdout)?;
let quota_list = parse_quota_list(volume, &output_str);
Ok(quota_list)
}
#[test]
fn test_quota_list() {
let test_data = r#"
Path Hard-limit Soft-limit Used Available Soft-limit exceeded? Hard-limit exceeded?
----------------------------------------------------------------------------------------------
/ 1.0KB 80%(819Bytes) 0Bytes 1.0KB No No
"#;
let result = parse_quota_list("test", test_data);
let quotas = vec![Quota {
path: PathBuf::from("/"),
limit: 1024,
used: 0,
}];
println!("quota_list: {:?}", result);
assert_eq!(quotas, result);
}
fn parse_quota_list(volume: &str, output_str: &str) -> Vec<Quota> {
let mut quota_list = Vec::new();
if output_str.trim() == format!("quota: No quota configured on volume {}", volume) {
return quota_list;
}
for line in output_str.lines() {
if line.is_empty() {
continue;
}
if line.starts_with(' ') {
continue;
}
if line.starts_with('-') {
continue;
}
let parts: Vec<&str> = line.split_whitespace().collect::<Vec<&str>>();
if parts.len() > 3 {
let limit: f64 = match translate_to_bytes(parts[1]) {
Some(v) => v,
None => 0.0,
};
let used: f64 = match translate_to_bytes(parts[3]) {
Some(v) => v,
None => 0.0,
};
let quota = Quota {
path: PathBuf::from(parts[0].to_string()),
limit: limit as u64,
used: used as u64,
};
quota_list.push(quota);
}
}
quota_list
}
pub fn volume_enable_bitrot(volume: &str) -> Result<i32, GlusterError> {
let arg_list: Vec<&str> = vec!["volume", "bitrot", volume, "enable"];
process_output(run_command("gluster", &arg_list, true, false))
}
pub fn volume_disable_bitrot(volume: &str) -> Result<i32, GlusterError> {
let arg_list: Vec<&str> = vec!["volume", "bitrot", volume, "disable"];
process_output(run_command("gluster", &arg_list, true, false))
}
pub fn volume_set_bitrot_option(volume: &str, setting: &BitrotOption) -> Result<i32, GlusterError> {
let arg_list: Vec<String> = vec![
"volume".to_string(),
"bitrot".to_string(),
volume.to_string(),
setting.to_string(),
setting.value(),
];
process_output(run_command("gluster", &arg_list, true, true))
}
pub fn volume_enable_quotas(volume: &str) -> Result<i32, GlusterError> {
let arg_list: Vec<&str> = vec!["volume", "quota", volume, "enable"];
process_output(run_command("gluster", &arg_list, true, false))
}
pub fn volume_quotas_enabled(volume: &str) -> Result<bool, GlusterError> {
let vol_info = volume_info(volume)?;
let quota = vol_info.options.get("features.quota");
match quota {
Some(v) => {
if v == "off" {
Ok(false)
} else if v == "on" {
Ok(true)
} else {
Ok(false)
}
}
None => Ok(false),
}
}
pub fn volume_disable_quotas(volume: &str) -> Result<i32, GlusterError> {
let arg_list: Vec<&str> = vec!["volume", "quota", volume, "disable"];
process_output(run_command("gluster", &arg_list, true, false))
}
pub fn volume_remove_quota(volume: &str, path: &Path) -> Result<i32, GlusterError> {
let path_str = format!("{}", path.display());
let arg_list: Vec<&str> = vec!["volume", "quota", volume, "remove", &path_str];
process_output(run_command("gluster", &arg_list, true, false))
}
pub fn volume_add_quota(volume: &str, path: &Path, size: u64) -> Result<i32, GlusterError> {
let path_str = format!("{}", path.display());
let size_string = size.to_string();
let arg_list: Vec<&str> = vec![
"volume",
"quota",
volume,
"limit-usage",
&path_str,
&size_string,
];
process_output(run_command("gluster", &arg_list, true, false))
}
#[test]
fn test_parse_volume_status() {
let test_data = r#"
Gluster process TCP Port RDMA Port Online Pid
------------------------------------------------------------------------------
Brick 172.31.46.33:/mnt/xvdf 49152 0 Y 14228
Brick 172.31.19.130:/mnt/xvdf 49152 0 Y 14446
Self-heal Daemon on localhost N/A N/A Y 14248
Self-heal Daemon on ip-172-31-19-130.us-wes
t-2.compute.internal N/A N/A Y 14466
Task Status of Volume test
------------------------------------------------------------------------------
There are no active volume tasks
"#;
let result = parse_volume_status(test_data).unwrap();
println!("status: {:?}", result);
assert_eq!(result[0].brick.peer.hostname, "172.31.46.33".to_string());
assert_eq!(result[0].tcp_port, 49152);
assert_eq!(result[0].rdma_port, 0);
assert_eq!(result[0].online, true);
assert_eq!(result[0].pid, 14228);
assert_eq!(result[1].brick.peer.hostname, "172.31.19.130".to_string());
assert_eq!(result[1].tcp_port, 49152);
assert_eq!(result[1].rdma_port, 0);
assert_eq!(result[1].online, true);
assert_eq!(result[1].pid, 14446);
}
pub fn ok_to_remove(volume: &str, _brick: &Brick) -> Result<bool, GlusterError> {
let arg_list: Vec<&str> = vec!["vol", "status", volume];
let output = run_command("gluster", &arg_list, true, false);
if !output.status.success() {
let stderr = String::from_utf8(output.stderr)?;
return Err(GlusterError::new(stderr));
}
let output_str = String::from_utf8(output.stdout)?;
let _bricks = parse_volume_status(&output_str)?;
Ok(true)
}
fn parse_volume_status(output_str: &str) -> Result<Vec<BrickStatus>, GlusterError> {
let mut bricks: Vec<BrickStatus> = Vec::new();
for line in output_str.lines() {
if line.starts_with("Status") {
continue;
}
if line.starts_with("Gluster") {
continue;
}
if line.starts_with('-') {
continue;
}
let regex_str = r#"Brick\s+(?P<hostname>[a-zA-Z0-9.]+)
:(?P<path>[/a-zA-z0-9]+)
\s+(?P<tcp>[0-9]+)\s+(?P<rdma>[0-9]+)\s+(?P<online>[Y,N])\s+(?P<pid>[0-9]+)"#;
let brick_regex = Regex::new(®ex_str.replace("\n", ""))?;
if let Some(result) = brick_regex.captures(&line) {
let _tcp_port = match result.name("tcp") {
Some(port) => port,
None => {
return Err(GlusterError::new(
"Unable to find tcp port in gluster vol \
status output"
.to_string(),
));
}
};
let peer = Peer {
uuid: Uuid::new_v4(),
hostname: result.name("hostname").unwrap().as_str().to_string(),
status: State::Unknown,
};
let brick = Brick {
peer,
path: PathBuf::from(result.name("path").unwrap().as_str()),
};
let online = match result.name("online").unwrap().as_str() {
"Y" => true,
"N" => false,
_ => false,
};
let status = BrickStatus {
brick,
tcp_port: u16::from_str(result.name("tcp").unwrap().as_str())?,
rdma_port: u16::from_str(result.name("rdma").unwrap().as_str())?,
online,
pid: u16::from_str(result.name("pid").unwrap().as_str())?,
};
bricks.push(status);
}
}
Ok(bricks)
}
pub fn volume_status(volume: &str) -> Result<Vec<BrickStatus>, GlusterError> {
let arg_list: Vec<&str> = vec!["vol", "status", volume];
let output = run_command("gluster", &arg_list, true, false);
if !output.status.success() {
let stderr = String::from_utf8(output.stderr)?;
return Err(GlusterError::new(stderr));
}
let output_str = String::from_utf8(output.stdout)?;
let bricks = parse_volume_status(&output_str)?;
Ok(bricks)
}
pub fn volume_remove_brick(
volume: &str,
bricks: Vec<Brick>,
force: bool,
) -> Result<i32, GlusterError> {
if bricks.is_empty() {
return Err(GlusterError::new(
"The brick list is empty. Not shrinking volume".to_string(),
));
}
for brick in bricks {
let ok = ok_to_remove(&volume, &brick)?;
if ok {
let mut arg_list: Vec<&str> = vec!["volume", "remove-brick", volume];
if force {
arg_list.push("force");
}
arg_list.push("start");
let _status = process_output(run_command("gluster", &arg_list, true, true));
} else {
return Err(GlusterError::new(
"Unable to remove brick due to redundancy failure".to_string(),
));
}
}
Ok(0)
}
pub fn volume_add_brick(volume: &str, bricks: &[Brick], force: bool) -> Result<i32, GlusterError> {
if bricks.is_empty() {
return Err(GlusterError::new(
"The brick list is empty. Not expanding volume".to_string(),
));
}
let mut arg_list: Vec<String> = Vec::new();
arg_list.push("volume".to_string());
arg_list.push("add-brick".to_string());
arg_list.push(volume.to_string());
for brick in bricks.iter() {
arg_list.push(brick.to_string());
}
if force {
arg_list.push("force".to_string());
}
process_output(run_command("gluster", &arg_list, true, true))
}
pub fn volume_start(volume: &str, force: bool) -> Result<i32, GlusterError> {
let mut arg_list: Vec<&str> = vec!["volume", "start", volume];
if force {
arg_list.push("force");
}
process_output(run_command("gluster", &arg_list, true, true))
}
pub fn volume_stop(volume: &str, force: bool) -> Result<i32, GlusterError> {
let mut arg_list: Vec<&str> = vec!["volume", "stop", volume];
if force {
arg_list.push("force");
}
process_output(run_command("gluster", &arg_list, true, true))
}
pub fn volume_delete(volume: &str) -> Result<i32, GlusterError> {
let arg_list: Vec<&str> = vec!["volume", "delete", volume];
process_output(run_command("gluster", &arg_list, true, true))
}
pub fn volume_rebalance(_volume: &str) {
}
fn volume_create<T: ToString>(
volume: &str,
options: &HashMap<VolumeTranslator, T>,
transport: &Transport,
bricks: &[Brick],
force: bool,
) -> Result<i32, GlusterError> {
if bricks.is_empty() {
return Err(GlusterError::new(
"The brick list is empty. Not creating volume".to_string(),
));
}
let mut arg_list: Vec<String> = Vec::new();
arg_list.push("volume".to_string());
arg_list.push("create".to_string());
arg_list.push(volume.to_string());
for (key, value) in options.iter() {
arg_list.push(key.clone().to_string());
arg_list.push(value.to_string());
}
arg_list.push("transport".to_string());
arg_list.push(transport.clone().to_string());
for brick in bricks.iter() {
arg_list.push(brick.to_string());
}
if force {
arg_list.push("force".to_string());
}
process_output(run_command("gluster", &arg_list, true, true))
}
fn vol_set(volume: &str, option: &GlusterOption) -> Result<i32, GlusterError> {
let mut arg_list: Vec<String> = Vec::new();
arg_list.push("volume".to_string());
arg_list.push("set".to_string());
arg_list.push(volume.to_string());
arg_list.push(option.to_string());
arg_list.push(option.value());
process_output(run_command("gluster", &arg_list, true, true))
}
pub fn volume_set_options(volume: &str, settings: &[GlusterOption]) -> Result<i32, GlusterError> {
let results: Vec<Result<i32, GlusterError>> = settings
.iter()
.map(|gluster_opt| vol_set(volume, gluster_opt))
.collect();
let mut error_list: Vec<String> = Vec::new();
for result in results {
match result {
Ok(_) => {}
Err(e) => error_list.push(e.to_string()),
}
}
if !error_list.is_empty() {
return Err(GlusterError::new(error_list.join("\n")));
}
Ok(0)
}
pub fn volume_create_replicated(
volume: &str,
replica_count: usize,
transport: &Transport,
bricks: &[Brick],
force: bool,
) -> Result<i32, GlusterError> {
let mut volume_translators: HashMap<VolumeTranslator, usize> = HashMap::new();
volume_translators.insert(VolumeTranslator::Replica, replica_count);
volume_create(volume, &volume_translators, &transport, &bricks, force)
}
pub fn volume_create_arbiter(
volume: &str,
replica_count: usize,
arbiter_count: usize,
transport: &Transport,
bricks: &[Brick],
force: bool,
) -> Result<i32, GlusterError> {
let mut volume_translators: HashMap<VolumeTranslator, usize> = HashMap::new();
volume_translators.insert(VolumeTranslator::Replica, replica_count);
volume_translators.insert(VolumeTranslator::Arbiter, arbiter_count);
volume_create(volume, &volume_translators, &transport, &bricks, force)
}
pub fn volume_create_striped(
volume: &str,
stripe: usize,
transport: &Transport,
bricks: &[Brick],
force: bool,
) -> Result<i32, GlusterError> {
let mut volume_translators: HashMap<VolumeTranslator, usize> = HashMap::new();
volume_translators.insert(VolumeTranslator::Stripe, stripe);
volume_create(volume, &volume_translators, &transport, &bricks, force)
}
pub fn volume_create_striped_replicated(
volume: &str,
stripe: usize,
replica: usize,
transport: &Transport,
bricks: &[Brick],
force: bool,
) -> Result<i32, GlusterError> {
let mut volume_translators: HashMap<VolumeTranslator, usize> = HashMap::new();
volume_translators.insert(VolumeTranslator::Stripe, stripe);
volume_translators.insert(VolumeTranslator::Replica, replica);
volume_create(volume, &volume_translators, &transport, &bricks, force)
}
pub fn volume_create_distributed(
volume: &str,
transport: &Transport,
bricks: &[Brick],
force: bool,
) -> Result<i32, GlusterError> {
let volume_translators: HashMap<VolumeTranslator, String> = HashMap::new();
volume_create(volume, &volume_translators, &transport, &bricks, force)
}
pub fn volume_create_erasure(
volume: &str,
disperse: usize,
redundancy: usize,
transport: &Transport,
bricks: &[Brick],
force: bool,
) -> Result<i32, GlusterError> {
let mut volume_translators: HashMap<VolumeTranslator, usize> = HashMap::new();
volume_translators.insert(VolumeTranslator::Disperse, disperse);
volume_translators.insert(VolumeTranslator::Redundancy, redundancy);
volume_create(volume, &volume_translators, &transport, &bricks, force)
}