use super::*;
impl_veilid_log_facility!("fanout");
#[derive(Debug)]
struct FanoutContext<'a> {
fanout_queue: FanoutQueue<'a>,
result: FanoutResult,
done: FanoutDoneDisposition,
stop_source: Option<StopSource>,
}
#[derive(Debug, Copy, Clone, Default)]
pub enum FanoutResultKind {
#[default]
Incomplete,
Timeout,
Consensus,
Exhausted,
}
impl FanoutResultKind {
pub fn is_incomplete(&self) -> bool {
matches!(self, Self::Incomplete)
}
}
#[derive(Clone, Debug, Default)]
pub struct FanoutResult {
pub kind: FanoutResultKind,
pub consensus_nodes: Vec<NodeRef>,
pub value_nodes: Vec<NodeRef>,
}
impl fmt::Display for FanoutResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let kc = match self.kind {
FanoutResultKind::Incomplete => "I",
FanoutResultKind::Timeout => "T",
FanoutResultKind::Consensus => "C",
FanoutResultKind::Exhausted => "E",
};
if f.alternate() {
write!(
f,
"{}:{}[{}]",
kc,
self.consensus_nodes.len(),
self.consensus_nodes
.iter()
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join(","),
)
} else {
write!(f, "{}:{}", kc, self.consensus_nodes.len())
}
}
}
pub fn debug_fanout_results(results: &[FanoutResult]) -> String {
let mut col = 0;
let mut out = String::new();
let mut left = results.len();
for r in results {
if col == 0 {
out += " ";
}
let sr = format!("{}", r);
out += &sr;
out += ",";
col += 1;
left -= 1;
if col == 32 && left != 0 {
col = 0;
out += "\n"
}
}
out
}
#[derive(Debug)]
pub struct FanoutCallOutput {
pub peer_info_list: Vec<Arc<PeerInfo>>,
pub disposition: FanoutCallDisposition,
}
#[derive(Debug, Clone, Copy)]
pub enum FanoutQueueMode {
ThrottleAtConsensus,
Unthrottled,
}
const THROTTLE_DURATION_PERCENT: u64 = 33;
#[derive(Debug, Copy, Clone)]
pub enum FanoutCallDisposition {
Timeout,
Invalid,
Rejected,
Stale,
Accepted,
AcceptedNewerRestart,
AcceptedNewer,
}
#[derive(Debug, Copy, Clone)]
pub enum FanoutDoneDisposition {
DoneEarly,
#[allow(dead_code)]
Done,
NotDone,
}
enum FanoutProcessorReturn {
DoneEarly,
Done,
Tick,
}
pub type FanoutCallResult = Result<FanoutCallOutput, RPCError>;
pub type FanoutPeerInfoFilter = Arc<dyn (Fn(Arc<PeerInfo>) -> bool) + Send + Sync>;
pub type FanoutCheckDone = Arc<dyn (Fn(&FanoutResult) -> FanoutDoneDisposition) + Send + Sync>;
pub type FanoutCallRoutine =
Arc<dyn (Fn(NodeRef) -> PinBoxFutureStatic<FanoutCallResult>) + Send + Sync>;
pub fn empty_fanout_peer_info_filter() -> FanoutPeerInfoFilter {
Arc::new(|_| true)
}
pub fn capability_fanout_peer_info_filter(caps: Vec<VeilidCapability>) -> FanoutPeerInfoFilter {
Arc::new(move |pi| pi.node_info().has_all_capabilities(&caps))
}
pub(crate) struct FanoutCall<'a> {
routing_table: &'a RoutingTable,
name: String,
hash_coordinate: HashCoordinate,
node_count: usize,
fanout_tasks: usize,
consensus_count: usize,
consensus_width: usize,
timeout: TimestampDuration,
peer_info_filter: FanoutPeerInfoFilter,
call_routine: FanoutCallRoutine,
check_done: FanoutCheckDone,
}
impl VeilidComponentRegistryAccessor for FanoutCall<'_> {
fn registry(&self) -> VeilidComponentRegistry {
self.routing_table.registry()
}
}
pub(crate) struct FanoutCallParams {
pub name: String,
pub hash_coordinate: HashCoordinate,
pub node_count: usize,
pub fanout_tasks: usize,
pub consensus_count: usize,
pub consensus_width: usize,
pub timeout: TimestampDuration,
}
impl<'a> FanoutCall<'a> {
pub fn new(
routing_table: &'a RoutingTable,
params: FanoutCallParams,
peer_info_filter: FanoutPeerInfoFilter,
call_routine: FanoutCallRoutine,
check_done: FanoutCheckDone,
) -> Self {
Self {
routing_table,
name: params.name,
hash_coordinate: params.hash_coordinate,
node_count: params.node_count,
fanout_tasks: params.fanout_tasks,
consensus_count: params.consensus_count,
consensus_width: params.consensus_width,
timeout: params.timeout,
peer_info_filter,
call_routine,
check_done,
}
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "fanout", skip_all, fields(__VEILID_LOG_KEY = self.log_key()))
)]
fn evaluate_done(&self, ctx: &mut FanoutContext) -> FanoutDoneDisposition {
if !matches!(ctx.done, FanoutDoneDisposition::NotDone) {
return ctx.done;
}
let fanout_result = ctx.fanout_queue.with_nodes(|nodes, sorted_nodes| {
let mut consensus: Option<bool> = None;
let mut consensus_nodes: Vec<NodeRef> = vec![];
let mut value_nodes: Vec<NodeRef> = vec![];
for sn in sorted_nodes {
let node = nodes.get(sn).unwrap_or_log();
match node.status.stage() {
FanoutNodeStage::Queued | FanoutNodeStage::InProgress => {
if consensus.is_none() {
consensus = Some(false);
}
}
FanoutNodeStage::Timeout
| FanoutNodeStage::Rejected
| FanoutNodeStage::Disqualified => {
}
FanoutNodeStage::Stale => {
value_nodes.push(node.work_item.node_ref.clone());
}
FanoutNodeStage::Accepted => {
value_nodes.push(node.work_item.node_ref.clone());
consensus_nodes.push(node.work_item.node_ref.clone());
if consensus.is_none() && consensus_nodes.len() >= self.consensus_count {
consensus = Some(true);
}
}
}
}
match consensus {
Some(true) => FanoutResult {
kind: FanoutResultKind::Consensus,
consensus_nodes,
value_nodes,
},
Some(false) => FanoutResult {
kind: FanoutResultKind::Incomplete,
consensus_nodes,
value_nodes,
},
None => FanoutResult {
kind: FanoutResultKind::Exhausted,
consensus_nodes,
value_nodes,
},
}
});
let done = (self.check_done)(&fanout_result);
ctx.result = fanout_result;
ctx.done = done;
if !matches!(done, FanoutDoneDisposition::NotDone) {
drop(ctx.stop_source.take())
}
done
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "fanout", skip_all, fields(__VEILID_LOG_KEY = self.log_key()))
)]
async fn fanout_processor(
&self,
lane_name: String,
context: &Mutex<FanoutContext<'_>>,
) -> Result<FanoutProcessorReturn, RPCError> {
let stop_token = context
.lock()
.stop_source
.as_ref()
.ok_or_else(|| RPCError::internal("should have stop source"))?
.token();
loop {
let work_receiver = {
let mut context_locked = context.lock();
veilid_log!(self debug "{}[{}]: Requesting work", self.name, lane_name);
context_locked
.fanout_queue
.request_work(lane_name.clone())?
};
let Ok(Ok(work_item)) = work_receiver
.recv_async()
.timeout_at(stop_token.clone())
.await
else {
veilid_log!(self debug "{}[{}]: Lane done", self.name, lane_name);
break Ok(FanoutProcessorReturn::Done);
};
let work_node = work_item.node_ref.clone();
let cancel_stop_token = work_item.cancel_stop_token.clone();
match (self.call_routine)(work_node.clone())
.timeout_at(cancel_stop_token)
.await
{
Ok(Ok(output)) => {
let filtered_v: Vec<Arc<PeerInfo>> = output
.peer_info_list
.into_iter()
.filter(|pi| {
if !(self.peer_info_filter)(pi.clone()) {
return false;
}
true
})
.collect();
let new_nodes = self
.routing_table
.register_nodes_with_peer_info_list(filtered_v);
{
let mut context_locked = context.lock();
let cur_ts = Timestamp::now_non_decreasing();
match output.disposition {
FanoutCallDisposition::Timeout => {
context_locked.fanout_queue.timeout(work_node, cur_ts);
}
FanoutCallDisposition::Rejected => {
context_locked.fanout_queue.rejected(work_node, cur_ts);
}
FanoutCallDisposition::Accepted => {
context_locked.fanout_queue.accepted(work_node, cur_ts);
}
FanoutCallDisposition::AcceptedNewerRestart => {
context_locked.fanout_queue.all_accepted_to_queued(cur_ts);
context_locked.fanout_queue.accepted(work_node, cur_ts);
}
FanoutCallDisposition::AcceptedNewer => {
context_locked.fanout_queue.all_accepted_to_stale(cur_ts);
context_locked.fanout_queue.accepted(work_node, cur_ts);
}
FanoutCallDisposition::Invalid => {
context_locked.fanout_queue.disqualified(work_node, cur_ts);
}
FanoutCallDisposition::Stale => {
context_locked.fanout_queue.stale(work_node, cur_ts);
}
}
context_locked.fanout_queue.update(&new_nodes, cur_ts);
match self.evaluate_done(&mut context_locked) {
FanoutDoneDisposition::DoneEarly => {
veilid_log!(self debug "{}[{}]: Fanout done, terminating all other lanes", self.name, lane_name);
break Ok(FanoutProcessorReturn::DoneEarly);
}
FanoutDoneDisposition::Done => {
veilid_log!(self debug "{}[{}]: Fanout done, letting other lanes complete", self.name, lane_name);
break Ok(FanoutProcessorReturn::Done);
}
FanoutDoneDisposition::NotDone => {
veilid_log!(self debug "{}[{}]: Work done, lane checking for more work", self.name, lane_name);
}
}
}
}
Ok(Err(e)) => {
veilid_log!(self debug "{}[{}]: Error occurred, terminating fanout: {}", self.name, lane_name, e);
break Err(e);
}
Err(_) => {
veilid_log!(self debug "{}[{}]: Work cancelled, lane checking for more work", self.name, lane_name);
}
};
}
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "fanout", skip_all, fields(__VEILID_LOG_KEY = self.log_key()))
)]
fn init_closest_nodes(
&self,
context: &mut FanoutContext,
cur_ts: Timestamp,
) -> Result<(), RPCError> {
let closest_nodes = {
let peer_info_filter = self.peer_info_filter.clone();
let filter = Box::new(
move |opt_snap: &Option<BucketEntrySnapshot>, _cur_ts: Timestamp| {
let Some(snap) = opt_snap else {
return false;
};
let Some(peer_info) = snap.get_peer_info(RoutingDomain::PublicInternet) else {
return false;
};
if peer_info.signatures().is_empty() {
return false;
}
if !(peer_info_filter)(peer_info) {
return false;
}
true
},
) as RoutingTableEntryFilter;
let filters = VecDeque::from([filter]);
let transform =
|opt_snap: Option<BucketEntrySnapshot>| opt_snap.unwrap_or_log().node_ref.clone();
self.routing_table.get_preferred_closest_nodes(
self.node_count,
self.hash_coordinate.clone(),
filters,
transform,
)
};
context.fanout_queue.update(&closest_nodes, cur_ts);
Ok(())
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "fanout", skip_all, fields(__VEILID_LOG_KEY = self.log_key()))
)]
pub async fn run(
&self,
init_fanout_queue: Vec<NodeRef>,
fanout_queue_mode: FanoutQueueMode,
) -> Result<FanoutResult, RPCError> {
let node_sort = Box::new(RoutingTable::make_closest_node_id_sort(
self.hash_coordinate.clone(),
));
let context = Arc::new(Mutex::new(FanoutContext {
fanout_queue: FanoutQueue::new(
self.name.clone(),
self.routing_table.registry(),
self.hash_coordinate.kind(),
node_sort,
self.consensus_count,
self.consensus_width,
match fanout_queue_mode {
FanoutQueueMode::ThrottleAtConsensus => Some(
self.timeout
.saturating_mul(THROTTLE_DURATION_PERCENT)
.div(100),
),
FanoutQueueMode::Unthrottled => None,
},
),
result: FanoutResult {
kind: FanoutResultKind::Incomplete,
consensus_nodes: vec![],
value_nodes: vec![],
},
done: FanoutDoneDisposition::NotDone,
stop_source: Some(StopSource::new()),
}));
let timeout_ms = self.timeout.millis_u32().map_err(RPCError::internal)?;
{
let context_locked = &mut *context.lock();
let cur_ts = Timestamp::now_non_decreasing();
self.init_closest_nodes(context_locked, cur_ts)?;
context_locked
.fanout_queue
.update(&init_fanout_queue, cur_ts);
if !matches!(
self.evaluate_done(context_locked),
FanoutDoneDisposition::NotDone
) {
return Ok(core::mem::take(&mut context_locked.result));
}
}
let stop_token = context
.lock()
.stop_source
.as_ref()
.ok_or_else(|| RPCError::internal("should have stop source"))?
.token();
let make_tick_future = || {
let stop_token = stop_token.clone();
pin_dyn_future!(async move {
if sleep(100).timeout_at(stop_token).await.is_err() {
return Ok(FanoutProcessorReturn::Done);
}
Ok(FanoutProcessorReturn::Tick)
})
};
let mut unord = FuturesUnordered::new();
{
for n in 0..self.fanout_tasks {
unord.push(pin_dyn_future!(
self.fanout_processor(format!("lane#{}", n), &context)
));
}
unord.push(make_tick_future());
}
match timeout(
timeout_ms,
async {
loop {
if let Some(res) = unord.next().in_current_span().await {
match res {
Ok(FanoutProcessorReturn::DoneEarly) => {
break Ok(());
}
Ok(FanoutProcessorReturn::Done) => {
}
Ok(FanoutProcessorReturn::Tick) => {
let context_locked = &mut *context.lock();
let cur_ts = Timestamp::now_non_decreasing();
context_locked.fanout_queue.send_more_work(cur_ts);
if !unord.is_empty() {
unord.push(make_tick_future());
}
}
Err(e) => {
break Err(e);
}
}
} else {
break Ok(());
}
}
}
.in_current_span(),
)
.await
{
Ok(Ok(())) => {
let context_locked = &mut *context.lock();
veilid_log!(self debug "{}: Finished FanoutQueue:\n{}", self.name, context_locked.fanout_queue);
Ok(core::mem::take(&mut context_locked.result))
}
Ok(Err(e)) => {
Err(e)
}
Err(_) => {
let context_locked = &mut *context.lock();
let cur_ts = Timestamp::now_non_decreasing();
context_locked
.fanout_queue
.all_unfinished_to_timeout(cur_ts);
veilid_log!(self debug "{}: Timeout FanoutQueue:\n{}", self.name, context_locked.fanout_queue);
if !matches!(
self.evaluate_done(context_locked),
FanoutDoneDisposition::NotDone,
) {
return Ok(core::mem::take(&mut context_locked.result));
}
let mut result = core::mem::take(&mut context_locked.result);
result.kind = FanoutResultKind::Timeout;
Ok(result)
}
}
}
}