use byteorder::{BigEndian, ByteOrder};
use cannyls::lump::LumpId;
use fibers_rpc::client::Options as RpcOptions;
use frugalos_raft::NodeId;
use libfrugalos::entity::object::ObjectVersion;
use libfrugalos::time::Seconds;
use raftlog::cluster::ClusterMembers;
use siphasher::sip::SipHasher;
use std::hash::{Hash, Hasher};
use std::ops::Range;
use std::time::Duration;
pub(crate) const LUMP_NAMESPACE_CONTENT: u8 = 1;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ClusterMember {
pub node: NodeId,
pub device: String,
}
impl ClusterMember {
pub(crate) fn make_lump_id(&self, version: ObjectVersion) -> LumpId {
make_lump_id(&self.node, version)
}
pub(crate) fn make_available_object_lump_id_range(&self) -> Range<LumpId> {
make_available_object_lump_id_range(&self.node)
}
}
pub(crate) fn make_lump_id(node: &NodeId, version: ObjectVersion) -> LumpId {
let mut id = [0; 16];
(&mut id[0..7]).copy_from_slice(node.local_id.as_slice());
id[0] = LUMP_NAMESPACE_CONTENT;
BigEndian::write_u64(&mut id[8..], version.0);
LumpId::new(BigEndian::read_u128(&id[..]))
}
pub(crate) fn get_object_version_from_lump_id(lump_id: LumpId) -> ObjectVersion {
let mut id = [0; 16];
BigEndian::write_u128(&mut id, lump_id.as_u128());
ObjectVersion(BigEndian::read_u64(&id[8..]))
}
pub fn make_available_object_lump_id_range(node: &NodeId) -> Range<LumpId> {
let range = node.local_id.to_available_lump_id_range();
let mut id = [0; 16];
BigEndian::write_u128(&mut id, range.start.as_u128());
id[0] = LUMP_NAMESPACE_CONTENT;
let start = LumpId::new(BigEndian::read_u128(&id[..]));
BigEndian::write_u128(&mut id, range.end.as_u128());
id[0] = LUMP_NAMESPACE_CONTENT;
let end = LumpId::new(BigEndian::read_u128(&id[..]));
Range { start, end }
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct CannyLsClientConfig {
#[serde(
rename = "cannyls_device_max_queue_len",
default = "default_cannyls_device_max_queue_len"
)]
pub device_max_queue_len: usize,
#[serde(
rename = "cannyls_rpc_max_queue_len",
default = "default_cannyls_rpc_max_queue_len"
)]
pub rpc_max_queue_len: u64,
}
impl CannyLsClientConfig {
pub fn rpc_options(&self) -> RpcOptions {
RpcOptions {
max_queue_len: Some(self.rpc_max_queue_len),
..Default::default()
}
}
}
impl Default for CannyLsClientConfig {
fn default() -> Self {
Self {
device_max_queue_len: default_cannyls_device_max_queue_len(),
rpc_max_queue_len: default_cannyls_rpc_max_queue_len(),
}
}
}
fn default_cannyls_device_max_queue_len() -> usize {
4096
}
fn default_cannyls_rpc_max_queue_len() -> u64 {
512
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase", tag = "type")]
pub enum MdsRequestPolicy {
Conservative,
Speculative {
#[serde(
rename = "timeout_millis",
default = "default_mds_client_request_timeout",
with = "frugalos_core::serde_ext::duration_millis"
)]
timeout: Duration,
},
}
impl Default for MdsRequestPolicy {
fn default() -> Self {
MdsRequestPolicy::Conservative
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct MdsClientConfig {
#[serde(
rename = "put_content_timeout_secs",
default = "default_mds_client_put_content_timeout"
)]
pub put_content_timeout: Seconds,
#[serde(default)]
pub default_request_policy: MdsRequestPolicy,
#[serde(default)]
pub get_request_policy: MdsRequestPolicy,
#[serde(default)]
pub head_request_policy: MdsRequestPolicy,
}
fn default_mds_client_request_timeout() -> Duration {
Duration::from_secs(1)
}
impl Default for MdsClientConfig {
fn default() -> Self {
MdsClientConfig {
put_content_timeout: default_mds_client_put_content_timeout(),
default_request_policy: Default::default(),
get_request_policy: Default::default(),
head_request_policy: Default::default(),
}
}
}
fn default_mds_client_put_content_timeout() -> Seconds {
Seconds(60)
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct DispersedClientConfig {
#[serde(
rename = "get_timeout_millis",
default = "default_dispersed_client_get_timeout",
with = "frugalos_core::serde_ext::duration_millis"
)]
pub get_timeout: Duration,
#[serde(
rename = "head_timeout_millis",
default = "default_dispersed_client_head_timeout",
with = "frugalos_core::serde_ext::duration_millis"
)]
pub head_timeout: Duration,
#[serde(
rename = "count_fragments_timeout_millis",
default = "default_dispersed_client_count_fragments_timeout",
with = "frugalos_core::serde_ext::duration_millis"
)]
pub count_fragments_timeout: Duration,
#[serde(flatten)]
pub cannyls: CannyLsClientConfig,
}
impl Default for DispersedClientConfig {
fn default() -> Self {
DispersedClientConfig {
get_timeout: default_dispersed_client_get_timeout(),
head_timeout: default_dispersed_client_head_timeout(),
count_fragments_timeout: default_dispersed_client_count_fragments_timeout(),
cannyls: Default::default(),
}
}
}
fn default_dispersed_client_get_timeout() -> Duration {
Duration::from_secs(2)
}
fn default_dispersed_client_head_timeout() -> Duration {
Duration::from_secs(2)
}
fn default_dispersed_client_count_fragments_timeout() -> Duration {
Duration::from_secs(2)
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Default)]
pub struct ReplicatedClientConfig {
#[serde(flatten)]
pub cannyls: CannyLsClientConfig,
}
#[allow(missing_docs)]
#[derive(Debug, Clone)]
pub struct ClientConfig {
pub cluster: ClusterConfig,
pub dispersed_client: DispersedClientConfig,
pub replicated_client: ReplicatedClientConfig,
pub storage: Storage,
pub mds: MdsClientConfig,
}
impl ClientConfig {
pub fn to_raft_cluster_members(&self) -> ClusterMembers {
self.cluster
.members
.iter()
.map(|m| m.node.to_raft_node_id())
.collect()
}
}
#[allow(missing_docs)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterConfig {
pub members: Vec<ClusterMember>,
}
impl ClusterConfig {
pub fn candidates(&self, version: ObjectVersion) -> impl Iterator<Item = &ClusterMember> {
let mut hasher = SipHasher::new();
version.0.hash(&mut hasher);
let i = hasher.finish() as usize % self.members.len();
Candidates::new(&self.members, i)
}
}
#[derive(Debug)]
struct Candidates<'a> {
members: &'a [ClusterMember],
current: usize,
end: usize,
}
impl<'a> Candidates<'a> {
fn new(members: &'a [ClusterMember], start: usize) -> Self {
Candidates {
members,
current: start,
end: start + members.len(),
}
}
}
impl<'a> Iterator for Candidates<'a> {
type Item = &'a ClusterMember;
fn next(&mut self) -> Option<Self::Item> {
if self.current == self.end {
None
} else {
let i = self.current % self.members.len();
self.current += 1;
Some(&self.members[i])
}
}
}
#[derive(Debug)]
pub struct Participants<'a> {
members: &'a [ClusterMember],
}
impl<'a> Participants<'a> {
pub fn dispersed(members: &'a [ClusterMember], fragments: u8) -> Self {
let (members, _) = members.split_at(fragments as usize);
Participants { members }
}
pub fn fragment_index(&self, node_id: &NodeId) -> Option<usize> {
self.members.iter().position(|m| m.node == *node_id)
}
pub fn spares(&self, local_node: &NodeId) -> Vec<ClusterMember> {
self.members
.iter()
.filter(|m| m.node != *local_node)
.cloned()
.collect::<Vec<_>>()
}
#[allow(dead_code)]
fn len(&self) -> usize {
self.members.len()
}
}
#[allow(missing_docs)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum Storage {
#[serde(rename = "metadata")]
Metadata,
#[serde(rename = "replicated")]
Replicated(ReplicatedConfig),
#[serde(rename = "dispersed")]
Dispersed(DispersedConfig),
}
impl Storage {
pub fn is_metadata(&self) -> bool {
matches!(*self, Storage::Metadata)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicatedConfig {
pub tolerable_faults: u8,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DispersedConfig {
pub tolerable_faults: u8,
pub fragments: u8,
}
impl DispersedConfig {
pub fn fragments(&self) -> u8 {
self.fragments
}
}
#[cfg(test)]
mod tests {
use super::*;
use frugalos_raft::LocalNodeId;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use trackable::result::TestResult;
fn make_member(n: u8) -> ClusterMember {
let local_id = LocalNodeId::new([0, 0, 0, 0, 0, 0, n]);
ClusterMember {
node: NodeId {
local_id,
instance: 0,
addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0),
},
device: n.to_string(),
}
}
fn make_cluster(size: u8) -> ClusterConfig {
let mut members = Vec::new();
for n in 0..size {
members.push(make_member(n));
}
ClusterConfig { members }
}
fn collect_devices(cluster: &ClusterConfig, version: ObjectVersion) -> Vec<String> {
cluster
.candidates(version)
.map(|m| m.device.clone())
.collect::<Vec<_>>()
}
#[test]
fn cluster_config_works() {
let cluster = make_cluster(5);
let candidates = collect_devices(&cluster, ObjectVersion(1));
assert_eq!(candidates.len(), 5);
assert_eq!(candidates[0], "3");
assert_eq!(candidates[1], "4");
assert_eq!(candidates[2], "0");
assert_eq!(candidates[3], "1");
assert_eq!(candidates[4], "2");
}
#[test]
fn participants_works() -> TestResult {
let cluster_size = 5;
let fragments = 3;
let version = ObjectVersion(1);
let cluster = make_cluster(cluster_size);
let candidates = cluster.candidates(version).cloned().collect::<Vec<_>>();
let participants = Participants::dispersed(&candidates, fragments);
let matrix = vec![
(0, "3", true),
(1, "4", true),
(2, "0", true),
(3, "1", false),
(4, "2", false),
];
assert_eq!(participants.len(), fragments as usize);
for (i, device, is_participant) in matrix {
let member = &candidates[i as usize];
assert_eq!(member.device, device);
assert_eq!(
participants.fragment_index(&member.node).is_some(),
is_participant
);
}
let matrix = vec![
(0, vec!["4", "0"]),
(1, vec!["3", "0"]),
(2, vec!["3", "4"]),
(3, vec!["3", "4", "0"]),
];
for (i, expected_spares) in matrix {
let node_id = candidates[i].node;
assert_eq!(
expected_spares,
participants
.spares(&node_id)
.iter()
.map(|m| m.device.clone())
.collect::<Vec<_>>()
);
}
Ok(())
}
#[test]
#[allow(clippy::inconsistent_digit_grouping)]
fn make_lump_id_works() -> TestResult {
use std::str::FromStr;
let node = NodeId::from_str("1000a00.0@127.0.0.1:14278")?;
let object_version = ObjectVersion(0x1234_5678_9abc_def0);
let lump_id = make_lump_id(&node, object_version);
assert_eq!(
lump_id.as_u128(),
1 << 120 | 0x100_0a00_00 << 64 | 0x1234_5678_9abc_def0
);
Ok(())
}
#[test]
fn get_object_version_from_lump_id_works() -> TestResult {
#[allow(clippy::inconsistent_digit_grouping)]
let lump_id = LumpId::new(1 << 120 | 0x100_0a00_00 << 64 | 0x1234_5678_9abc_def0);
assert_eq!(
get_object_version_from_lump_id(lump_id),
ObjectVersion(0x1234_5678_9abc_def0)
);
Ok(())
}
}