#![allow(missing_docs, proc_macro_derive_resolution_fallback)]
use diesel::{pg::PgConnection, prelude::*, sql_query, sql_types::*, QueryableByName};
use std::result;
use try_from::TryInto;
use crate::common::*;
#[derive(Debug, QueryableByName)]
pub struct ShardInfo {
#[sql_type = "Bigint"]
shardid: i64,
#[sql_type = "Nullable<Text>"]
shardname: Option<String>,
#[sql_type = "Nullable<Text>"]
nodename: Option<String>,
#[sql_type = "Nullable<Integer>"]
nodeport: Option<i32>,
}
impl ShardInfo {
pub fn name(&self) -> Result<&str> {
self.shardname
.as_ref()
.map(|n| &n[..])
.ok_or_else(|| format_err!("missing shard name for {:?}", self))
}
pub fn url(&self, controller_url: &Url) -> Result<Url> {
match (&self.nodename, self.nodeport) {
(Some(nodename), Some(nodeport)) => {
let port_result: result::Result<u16, _> = nodeport.try_into();
let port = port_result.context("shard port is out of range")?;
let mut url = controller_url.to_owned();
url.set_host(Some(nodename))
.context("could not set shard host")?;
url.set_port(Some(port))
.map_err(|_| format_err!("could not set shard port"))?;
Ok(url)
}
_ => Err(format_err!("missing data about shard {:?}", self)),
}
}
}
const SHARDS_FOR_TABLE_SQL: &str = r#"
SELECT
shard.shardid AS shardid,
shard_name(shard.logicalrelid, shard.shardid) AS shardname,
placement.nodename AS nodename,
placement.nodeport AS nodeport
FROM pg_dist_shard shard
INNER JOIN pg_dist_shard_placement placement
ON (shard.shardid = placement.shardid)
WHERE shard.logicalrelid::regclass = $1::regclass
"#;
pub fn citus_shards(table: &str, dconn: &PgConnection) -> Result<Vec<ShardInfo>> {
Ok(sql_query(SHARDS_FOR_TABLE_SQL)
.bind::<Text, _>(table)
.get_results::<ShardInfo>(dconn)
.context("error querying for Citus shard placement")?)
}