use super::*;
impl_veilid_log_facility!("fanout");
impl_veilid_component_accessors!(FanoutQueue<'_>);
#[derive(Debug, Copy, Clone)]
pub enum FanoutNodeStage {
Queued,
InProgress,
Timeout,
Rejected,
Accepted,
Stale,
Disqualified,
}
#[derive(Debug, Clone)]
pub struct FanoutNodeStatus {
stage: FanoutNodeStage,
prev_status: Option<Box<FanoutNodeStatus>>,
transition_ts: Timestamp,
touch_ts: Timestamp,
}
impl FanoutNodeStatus {
pub fn stage(&self) -> FanoutNodeStage {
self.stage
}
pub fn is_finished(&self) -> bool {
match self.stage {
FanoutNodeStage::Queued | FanoutNodeStage::InProgress => false,
FanoutNodeStage::Timeout
| FanoutNodeStage::Rejected
| FanoutNodeStage::Disqualified
| FanoutNodeStage::Accepted
| FanoutNodeStage::Stale => true,
}
}
pub fn queued(timestamp: Timestamp) -> Self {
FanoutNodeStatus {
stage: FanoutNodeStage::Queued,
prev_status: None,
transition_ts: timestamp,
touch_ts: timestamp,
}
}
pub fn transition(&mut self, stage: FanoutNodeStage, timestamp: Timestamp) {
self.touch_ts = timestamp;
let prev_status = Box::new(self.clone());
self.stage = stage;
self.prev_status = Some(prev_status);
self.transition_ts = timestamp;
self.touch_ts = timestamp;
}
pub fn touch(&mut self, timestamp: Timestamp) {
match self.stage {
FanoutNodeStage::Queued | FanoutNodeStage::InProgress => {
self.touch_ts = timestamp;
}
FanoutNodeStage::Timeout
| FanoutNodeStage::Rejected
| FanoutNodeStage::Accepted
| FanoutNodeStage::Stale
| FanoutNodeStage::Disqualified => {
}
}
}
}
impl fmt::Display for FanoutNodeStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:?}{}{}",
self.stage,
if self.transition_ts == self.touch_ts {
"".to_string()
} else {
format!("({})", self.touch_ts.duration_since(self.transition_ts))
},
if let Some(prev_status) = &self.prev_status {
format!("<--{}", prev_status)
} else {
format!("@{}", self.transition_ts)
}
)
}
}
#[derive(Debug)]
pub struct FanoutNode {
pub work_item: FanoutWorkItem,
pub cancel_stop_source: Option<StopSource>,
pub status: FanoutNodeStatus,
}
impl fmt::Display for FanoutNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut start_status = &self.status;
while let Some(prev_status) = &start_status.prev_status {
start_status = prev_status;
}
let total_duration = self
.status
.touch_ts
.duration_since(start_status.transition_ts);
write!(
f,
"{}{}: {}",
self.work_item.node_ref,
if total_duration.is_zero() {
"".to_string()
} else {
format!(" ({})", total_duration)
},
self.status,
)
}
}
pub type FanoutQueueSort<'a> = Box<dyn Fn(&NodeId, &NodeId) -> core::cmp::Ordering + Send + 'a>;
#[derive(Debug)]
struct FanoutWorkRequest {
request_ts: Timestamp,
lane_name: String,
work_sender: FanoutWorkSender,
}
impl FanoutWorkRequest {
fn new(lane_name: String, work_sender: FanoutWorkSender) -> Self {
Self {
request_ts: Timestamp::now_non_decreasing(),
lane_name,
work_sender,
}
}
pub fn request_ts(&self) -> Timestamp {
self.request_ts
}
pub fn lane_name(&self) -> String {
self.lane_name.clone()
}
pub fn into_work_sender(self) -> FanoutWorkSender {
self.work_sender
}
}
#[derive(Debug, Clone)]
pub struct FanoutWorkItem {
pub node_ref: NodeRef,
pub cancel_stop_token: StopToken,
}
pub type FanoutWorkReceiver = flume::Receiver<FanoutWorkItem>;
pub type FanoutWorkSender = flume::Sender<FanoutWorkItem>;
pub struct FanoutQueue<'a> {
name: String,
registry: VeilidComponentRegistry,
crypto_kind: CryptoKind,
nodes: HashMap<NodeId, FanoutNode>,
sorted_nodes: Vec<NodeId>,
node_sort: FanoutQueueSort<'a>,
work_request_sender: flume::Sender<FanoutWorkRequest>,
work_request_receiver: flume::Receiver<FanoutWorkRequest>,
consensus_count: usize,
consensus_width: usize,
opt_throttle_duration: Option<TimestampDuration>,
}
impl fmt::Debug for FanoutQueue<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FanoutQueue")
.field("crypto_kind", &self.crypto_kind)
.field("nodes", &self.nodes)
.field("sorted_nodes", &self.sorted_nodes)
.field("work_request_sender", &self.work_request_sender)
.field("work_request_receiver", &self.work_request_receiver)
.field("consensus_count", &self.consensus_count)
.field("consensus_width", &self.consensus_width)
.field("opt_throttle_duration", &self.opt_throttle_duration)
.finish()
}
}
impl fmt::Display for FanoutQueue<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"nodes:\n{}",
self.sorted_nodes
.iter()
.map(|x| format!(" {}: {}", x, self.nodes.get(x).unwrap_or_log().status))
.collect::<Vec<_>>()
.join("\n")
)
}
}
impl<'a> FanoutQueue<'a> {
pub fn new(
name: String,
registry: VeilidComponentRegistry,
crypto_kind: CryptoKind,
node_sort: FanoutQueueSort<'a>,
consensus_count: usize,
consensus_width: usize,
opt_throttle_duration: Option<TimestampDuration>,
) -> Self {
let (sender, receiver) = flume::unbounded();
Self {
name,
registry,
crypto_kind,
nodes: HashMap::new(),
sorted_nodes: Vec::new(),
node_sort,
work_request_sender: sender,
work_request_receiver: receiver,
consensus_count,
consensus_width,
opt_throttle_duration,
}
}
pub fn request_work(&mut self, lane_name: String) -> Result<FanoutWorkReceiver, RPCError> {
let (work_sender, work_receiver) = flume::bounded(1);
let work_request = FanoutWorkRequest::new(lane_name, work_sender);
let request_ts = work_request.request_ts();
self.work_request_sender
.send(work_request)
.map_err(RPCError::internal)?;
self.send_more_work(request_ts);
Ok(work_receiver)
}
pub fn update(&mut self, new_nodes: &[NodeRef], cur_ts: Timestamp) {
for node_ref in new_nodes {
let Some(key) = node_ref.node_ids().get(self.crypto_kind) else {
continue;
};
if self.nodes.contains_key(&key) {
continue;
}
let cancel_stop_source = StopSource::new();
let cancel_stop_token = cancel_stop_source.token();
self.nodes.insert(
key.clone(),
FanoutNode {
work_item: FanoutWorkItem {
node_ref: node_ref.clone(),
cancel_stop_token,
},
cancel_stop_source: Some(cancel_stop_source),
status: FanoutNodeStatus::queued(cur_ts),
},
);
self.sorted_nodes.push(key);
}
self.sorted_nodes.sort_by(&self.node_sort);
self.disqualify(cur_ts);
for node in &self.sorted_nodes {
self.nodes
.get_mut(node)
.unwrap_or_log()
.status
.touch(cur_ts);
}
veilid_log!(self debug
"{}: FanoutQueue::update:\n{}{}\n",
self.name,
if new_nodes.is_empty() {
"".to_string()
} else {
format!("new_nodes:{}\n",
new_nodes.iter().map(|x| format!("\n {}", x))
.collect::<Vec<String>>()
.join(","))
},
self.to_string()
);
}
pub fn send_more_work(&mut self, cur_ts: Timestamp) {
let registry = self.registry();
let mut working_toward_consensus = 0usize;
let mut counting_consensus = true;
let mut slow_nodes = Vec::<NodeRef>::new();
for x in &mut self.sorted_nodes {
if self.work_request_receiver.is_empty() {
break;
}
let mut throttle_unlock = false;
if self.opt_throttle_duration.is_some() {
if working_toward_consensus >= (self.consensus_count + slow_nodes.len()) {
break;
} else if !slow_nodes.is_empty() {
throttle_unlock = true;
}
}
let node = self.nodes.get_mut(x).unwrap_or_log();
match node.status.stage {
FanoutNodeStage::Queued => {
counting_consensus = false;
while let Ok(work_request) = self.work_request_receiver.try_recv() {
if throttle_unlock {
veilid_log!(registry debug "{}: Throttle unlock due to {} slow nodes", self.name, slow_nodes.len());
}
let lane_name = work_request.lane_name();
let request_ts = work_request.request_ts();
let work_sender = work_request.into_work_sender();
let work_item = node.work_item.clone();
if work_sender.send(work_item).is_ok() {
node.status.transition(FanoutNodeStage::InProgress, cur_ts);
veilid_log!(registry debug "{}: Queue sent more work {} after request to {} => {}", self.name, cur_ts.duration_since(request_ts), lane_name, node.work_item.node_ref);
break;
}
}
}
FanoutNodeStage::InProgress => {
if let Some(throttle_duration) = self.opt_throttle_duration {
let stage_duration = cur_ts.duration_since(node.status.transition_ts);
if stage_duration > throttle_duration {
slow_nodes.push(node.work_item.node_ref.clone());
} else {
counting_consensus = false;
}
}
if counting_consensus {
working_toward_consensus += 1;
}
}
FanoutNodeStage::Accepted | FanoutNodeStage::Stale => {
if counting_consensus {
working_toward_consensus += 1;
}
slow_nodes.retain(|x| !x.same_entry(&node.work_item.node_ref));
}
FanoutNodeStage::Timeout
| FanoutNodeStage::Rejected
| FanoutNodeStage::Disqualified => {
slow_nodes.retain(|x| !x.same_entry(&node.work_item.node_ref));
}
}
}
}
pub fn timeout(&mut self, node_ref: NodeRef, cur_ts: Timestamp) {
let key = node_ref.node_ids().get(self.crypto_kind).unwrap_or_log();
let node = self.nodes.get_mut(&key).unwrap_or_log();
if !matches!(node.status.stage, FanoutNodeStage::InProgress) {
unreachable!("should be in progress");
}
node.status.transition(FanoutNodeStage::Timeout, cur_ts);
}
pub fn rejected(&mut self, node_ref: NodeRef, cur_ts: Timestamp) {
let key = node_ref.node_ids().get(self.crypto_kind).unwrap_or_log();
let node = self.nodes.get_mut(&key).unwrap_or_log();
if !matches!(node.status.stage, FanoutNodeStage::InProgress) {
unreachable!("should be in progress");
}
node.status.transition(FanoutNodeStage::Rejected, cur_ts);
self.disqualify(cur_ts);
}
pub fn accepted(&mut self, node_ref: NodeRef, cur_ts: Timestamp) {
let key = node_ref.node_ids().get(self.crypto_kind).unwrap_or_log();
let node = self.nodes.get_mut(&key).unwrap_or_log();
if !matches!(node.status.stage, FanoutNodeStage::InProgress) {
unreachable!("should be in progress");
}
node.status.transition(FanoutNodeStage::Accepted, cur_ts);
}
pub fn stale(&mut self, node_ref: NodeRef, cur_ts: Timestamp) {
let key = node_ref.node_ids().get(self.crypto_kind).unwrap_or_log();
let node = self.nodes.get_mut(&key).unwrap_or_log();
if !matches!(node.status.stage, FanoutNodeStage::InProgress) {
unreachable!("should be in progress");
}
node.status.transition(FanoutNodeStage::Stale, cur_ts);
}
pub fn disqualified(&mut self, node_ref: NodeRef, cur_ts: Timestamp) {
let key = node_ref.node_ids().get(self.crypto_kind).unwrap_or_log();
let node = self.nodes.get_mut(&key).unwrap_or_log();
if !matches!(node.status.stage, FanoutNodeStage::InProgress) {
unreachable!("should be in progress");
}
node.status
.transition(FanoutNodeStage::Disqualified, cur_ts);
}
pub fn all_accepted_to_queued(&mut self, cur_ts: Timestamp) {
for node in &mut self.nodes {
if matches!(node.1.status.stage, FanoutNodeStage::Accepted) {
node.1.status.transition(FanoutNodeStage::Queued, cur_ts);
}
}
}
pub fn all_accepted_to_stale(&mut self, cur_ts: Timestamp) {
for node in &mut self.nodes {
if matches!(node.1.status.stage, FanoutNodeStage::Accepted) {
node.1.status.transition(FanoutNodeStage::Stale, cur_ts);
}
}
}
pub fn all_unfinished_to_timeout(&mut self, cur_ts: Timestamp) {
for node in &mut self.nodes {
if matches!(
node.1.status.stage,
FanoutNodeStage::Queued | FanoutNodeStage::InProgress
) {
node.1.status.transition(FanoutNodeStage::Timeout, cur_ts);
}
}
}
fn disqualify(&mut self, cur_ts: Timestamp) {
let mut consecutive_rejections = 0usize;
let mut rejected_consensus = false;
for (index, node_id) in self.sorted_nodes.iter().enumerate() {
let node = self.nodes.get_mut(node_id).unwrap_or_log();
if node.status.is_finished() {
if matches!(node.status.stage, FanoutNodeStage::Rejected) {
consecutive_rejections += 1;
if consecutive_rejections >= self.consensus_count {
rejected_consensus = true;
}
continue;
} else {
consecutive_rejections = 0;
}
continue;
}
if (self.consensus_width > 0 && index >= self.consensus_width) || rejected_consensus {
node.status
.transition(FanoutNodeStage::Disqualified, cur_ts);
if let Some(css) = node.cancel_stop_source.take() {
drop(css);
}
}
}
}
pub fn with_nodes<R, F: FnOnce(&HashMap<NodeId, FanoutNode>, &[NodeId]) -> R>(
&self,
func: F,
) -> R {
func(&self.nodes, &self.sorted_nodes)
}
}