use crate::gossip::sharded_gossip::{BandwidthThrottle, GossipType, ShardedGossip};
use crate::test_util::spawn_handler;
use crate::types::gossip::*;
use crate::types::wire;
use futures::stream::StreamExt;
use ghost_actor::dependencies::tracing;
use ghost_actor::GhostResult;
use itertools::Itertools;
use kitsune_p2p_proxy::tx2::{tx2_proxy, ProxyConfig};
use kitsune_p2p_timestamp::Timestamp;
use kitsune_p2p_types::agent_info::agent_info_helper::{AgentInfoEncode, AgentMetaInfoEncode};
use kitsune_p2p_types::agent_info::{AgentInfoInner, AgentInfoSigned};
use kitsune_p2p_types::bin_types::*;
use kitsune_p2p_types::config::KitsuneP2pTuningParams;
use kitsune_p2p_types::dht::prelude::power_and_count_from_length;
use kitsune_p2p_types::dht::spacetime::Topology;
use kitsune_p2p_types::dht::{ArqBounds, ArqStrat};
use kitsune_p2p_types::dht_arc::loc8::Loc8;
use kitsune_p2p_types::dht_arc::{DhtArc, DhtArcRange, DhtLocation};
use kitsune_p2p_types::metrics::metric_task;
use kitsune_p2p_types::tx2::tx2_api::*;
use kitsune_p2p_types::tx2::tx2_pool_promote::*;
use kitsune_p2p_types::tx2::tx2_utils::Share;
use kitsune_p2p_types::tx2::*;
use kitsune_p2p_types::*;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use tokio::task::JoinHandle;
use super::switchboard_evt_handler::SwitchboardEventHandler;
type KSpace = Arc<KitsuneSpace>;
type KAgent = Arc<KitsuneAgent>;
type KOpHash = Arc<KitsuneOpHash>;
pub type NodeEp = Tx2EpHnd<wire::Wire>;
static ZERO_SPACE: once_cell::sync::Lazy<Arc<KitsuneSpace>> =
once_cell::sync::Lazy::new(|| Arc::new(KitsuneSpace::new([0; 36].to_vec())));
#[derive(Clone)]
pub struct Switchboard {
pub(super) strat: ArqStrat,
pub(super) topology: Topology,
inner: Share<SwitchboardState>,
gossip_type: GossipType,
}
impl Switchboard {
pub fn new(topology: Topology, gossip_type: GossipType) -> Self {
Self {
strat: ArqStrat::default(),
topology,
inner: Share::new(SwitchboardState::default()),
gossip_type,
}
}
pub fn share<R, F>(&self, f: F) -> R
where
F: FnOnce(&mut SwitchboardState) -> R,
{
self.inner.share_mut(|sb, _| Ok(f(sb))).unwrap()
}
pub async fn add_nodes<const N: usize>(
&self,
tuning_params: KitsuneP2pTuningParams,
) -> [NodeEp; N] {
let mut nodes = vec![];
for _ in 0..N {
nodes.push(self.add_node(tuning_params.clone()).await);
}
nodes.try_into().unwrap()
}
pub async fn add_node(&self, tuning_params: KitsuneP2pTuningParams) -> NodeEp {
let space = ZERO_SPACE.clone();
let mem_config = MemConfig::default();
let proxy_config = ProxyConfig::default();
let f = tx2_mem_adapter(mem_config).await.unwrap();
let f = tx2_pool_promote(f, tuning_params.clone());
let f = tx2_proxy(f, proxy_config).unwrap();
let f = tx2_api(f, Default::default());
let mut ep = f
.bind("none:", KitsuneTimeout::from_millis(5000))
.await
.unwrap();
let ep_hnd = ep.handle().clone();
let evt_handler = SwitchboardEventHandler::new(ep_hnd.clone(), self.clone());
let host_api = Arc::new(evt_handler.clone());
let (evt_sender, handler_task) = spawn_handler(evt_handler.clone()).await;
let bandwidth = Arc::new(BandwidthThrottle::new(1000.0, 1000.0, 10.0));
let gossip = ShardedGossip::new(
tuning_params,
space.clone(),
ep_hnd.clone(),
evt_sender,
host_api,
self.gossip_type,
bandwidth,
Default::default(),
kitsune_p2p_fetch::FetchPool::new_bitwise_or(),
);
let gossip_module = GossipModule(gossip.clone());
let ep_task = metric_task(async move {
while let Some(evt) = ep.next().await {
match evt {
Tx2EpEvent::IncomingNotify(Tx2EpIncomingNotify { con, url, data, .. }) => {
match data {
wire::Wire::Gossip(wire::Gossip {
space: _,
data,
module: _,
}) => {
let data: Vec<u8> = data.into();
let data: Box<[u8]> = data.into_boxed_slice();
gossip_module.incoming_gossip(con, url, data)?
}
_ => unimplemented!(),
}
}
_evt => {
}
}
}
Ok(())
});
self.share(|sb| {
sb.metric_tasks.push(ep_task);
sb.handler_tasks.push(handler_task);
sb.nodes.insert(
ep_hnd.clone(),
NodeEntry {
local_agents: HashMap::new(),
remote_agents: HashMap::new(),
ops: HashMap::new(),
gossip,
},
);
});
ep_hnd
}
}
pub struct SwitchboardState {
space: KSpace,
pub(super) nodes: HashMap<NodeEp, NodeEntry>,
pub(super) ops: HashMap<Loc8, OpEntry>,
metric_tasks: Vec<JoinHandle<KitsuneResult<()>>>,
handler_tasks: Vec<JoinHandle<ghost_actor::GhostResult<()>>>,
}
impl Default for SwitchboardState {
fn default() -> Self {
Self {
space: ZERO_SPACE.clone(),
nodes: HashMap::new(),
ops: HashMap::new(),
metric_tasks: Vec::new(),
handler_tasks: Vec::new(),
}
}
}
impl SwitchboardState {
pub fn add_local_agent(&mut self, node_ep: &NodeEp, agent: &SwitchboardAgent) {
let SwitchboardAgent {
loc: loc8,
initial_arc,
} = agent.clone();
let agent = agent_from_loc(loc8);
let info = fake_agent_info(
self.space.clone(),
node_ep,
agent.clone(),
DhtArc::from_parts(initial_arc.canonical(), loc8.into()),
);
if let Some(existing) = self.local_agent_by_loc8(loc8) {
panic!(
"Attempted to insert two agents at the same Loc8. Existing agent info: {:?}",
existing.info
);
}
let node = self
.nodes
.get_mut(node_ep)
.expect("Node must be added first");
if let Some(existing) = node.local_agents.insert(loc8, AgentEntry::new(info)) {
panic!(
"Attempted to insert two agents at the same Loc8. Existing agent info: {:?}",
existing.info
);
}
node.gossip.local_agent_join(agent);
}
pub fn print_ascii_arcs(&self, width: usize, with_ops: bool) {
const NUM_TICKS: usize = 4;
let mut spaces = " ".repeat(width);
for i in 0..NUM_TICKS {
let n = i * 256 / NUM_TICKS;
let t = n * width / 256;
let s = n.to_string();
spaces.replace_range(t..t + s.len(), &s);
}
let mut subticks = " ".repeat(width);
for i in 0..NUM_TICKS * 2 {
let t = i * width / (NUM_TICKS * 2);
subticks.replace_range(t..=t, ".");
}
subticks.replace_range(width - 1..width, ".");
println!("node agent {} mid bounds", spaces);
println!(" {}", subticks);
let mut nodes: Vec<_> = self.nodes.iter().collect();
nodes.sort_by_key(|(ep, _)| ep.uniq());
for (ep, node) in nodes.into_iter() {
let node_id = ep.uniq();
let ascii = if with_ops {
let ops = node.ops.keys().copied();
DhtArcRange::Empty.to_ascii_with_ops(width, ops)
} else {
DhtArcRange::Empty.to_ascii(width)
};
println!(
"{:>4} {:>+5} ({:^width$})",
node_id,
"",
ascii,
width = width
);
for (agent_loc8, agent) in node.local_agents.iter() {
let interval = &agent.info.storage_arc;
let ascii = interval.to_ascii(width);
println!(
"{:>4} {:>+5} |{:^width$}| {:>+4} {:?}",
"",
agent_loc8,
ascii,
interval.start_loc().as_loc8(),
interval.map(|b| DhtLocation::as_loc8(&b)),
width = width
);
}
}
}
pub fn print_peer_lists(&self) {
println!("node agents");
let mut nodes: Vec<_> = self.nodes.iter().collect();
nodes.sort_by_key(|(ep, _)| ep.uniq());
for (ep, node) in nodes.iter() {
let mut agent_locs: Vec<_> = node.all_agent_locs().into_iter().collect();
agent_locs.sort();
println!("{:>4} {:?}", ep.uniq(), agent_locs);
}
}
pub fn all_peers(&mut self, node: &NodeEp) -> Vec<Loc8> {
let mut locs: Vec<_> = self
.nodes
.get(node)
.unwrap()
.all_agent_locs()
.into_iter()
.collect();
locs.sort();
locs
}
pub fn inject_peer_info<L: AsRef<Loc8>, A: IntoIterator<Item = L>>(
&mut self,
node: &NodeEp,
agents: A,
) {
let agents: Vec<_> = agents
.into_iter()
.map(|loc| {
let loc8 = *loc.as_ref();
(
loc8,
self.node_for_local_agent_loc8(loc8)
.unwrap()
.local_agents
.get(&loc8)
.unwrap()
.info
.to_owned(),
)
})
.collect();
self.nodes
.get_mut(node)
.expect("No node")
.remote_agents
.extend(agents)
}
pub fn exchange_all_peer_info(&mut self) {
let all_agent_locs: Vec<_> = self
.nodes
.values()
.flat_map(|n| n.local_agents.keys())
.collect();
let info: Vec<(_, Vec<_>)> = self
.nodes
.iter()
.map(|(ep, n)| {
let local: HashSet<_> = n.local_agents.keys().collect();
(
ep.clone(),
all_agent_locs
.iter()
.filter(|loc| !local.contains(**loc))
.copied()
.copied()
.map(Box::new)
.collect(),
)
})
.collect();
for (node, agents) in info {
self.inject_peer_info(&node, agents);
}
}
pub fn add_ops_timed<L: Into<Loc8>, O: IntoIterator<Item = (L, Timestamp)>>(
&mut self,
node_ep: &NodeEp,
is_integrated: bool,
ops: O,
) {
let ops: Vec<_> = ops
.into_iter()
.map(|(l, timestamp)| {
let loc8: Loc8 = l.into();
let hash = op_hash_from_loc(loc8);
(loc8, hash, timestamp)
})
.collect();
let node = self.nodes.get_mut(node_ep).expect("No node");
for (loc8, _, _) in ops.iter() {
node.ops.insert(*loc8, NodeOpEntry { is_integrated });
}
for (loc8, hash, timestamp) in ops {
if let Some(existing) = self.ops.insert(loc8, OpEntry { hash, timestamp }) {
tracing::warn!(
"inserted same op twice. this could be significant if dealing with custom timestamps. {:?}",
existing
);
}
}
node.gossip.new_integrated_data();
}
pub fn add_ops_now<L: Into<Loc8>, O: IntoIterator<Item = L>>(
&mut self,
node_ep: &NodeEp,
is_integrated: bool,
ops: O,
) {
let ops = ops.into_iter().map(|op| (op, Timestamp::now()));
self.add_ops_timed(node_ep, is_integrated, ops)
}
pub fn get_ops_loc8(&mut self, node_ep: &NodeEp) -> BTreeSet<Loc8> {
self.nodes
.get(node_ep)
.unwrap()
.ops
.keys()
.copied()
.map(Loc8::to_unsigned)
.collect()
}
pub(super) fn node_for_local_agent_loc8(&self, loc8: Loc8) -> Option<&NodeEntry> {
self.nodes
.values()
.find(|n| n.local_agents.keys().contains(&loc8))
}
pub(super) fn node_for_local_agent_loc8_mut(&mut self, loc8: Loc8) -> Option<&mut NodeEntry> {
self.nodes
.values_mut()
.find(|n| n.local_agents.keys().contains(&loc8))
}
pub(super) fn node_for_local_agent_hash_mut(
&mut self,
hash: &KitsuneAgent,
) -> Option<&mut NodeEntry> {
let agent_loc8 = hash.get_loc().as_loc8();
self.node_for_local_agent_loc8_mut(agent_loc8)
}
pub(super) fn local_agent_by_loc8(&self, loc8: Loc8) -> Option<&AgentEntry> {
self.nodes
.values()
.filter_map(|n| n.local_agent_by_loc8(loc8))
.next()
}
pub(super) fn local_agent_by_hash(&self, hash: &KitsuneAgent) -> Option<&AgentEntry> {
self.nodes
.values()
.filter_map(|n| n.local_agent_by_hash(hash))
.next()
}
pub(super) fn local_agents_for_node(
&mut self,
node: &NodeEp,
) -> &mut HashMap<Loc8, AgentEntry> {
&mut self
.nodes
.get_mut(node)
.expect("Node not added")
.local_agents
}
pub(super) fn remote_agents_for_node(
&mut self,
node: &NodeEp,
) -> &mut HashMap<Loc8, AgentInfoSigned> {
&mut self
.nodes
.get_mut(node)
.expect("Node not added")
.remote_agents
}
}
#[derive(Debug, Clone, derive_more::AsRef)]
pub struct SwitchboardAgent {
pub(super) loc: Loc8,
initial_arc: DhtArcRange<Loc8>,
}
impl SwitchboardAgent {
pub fn full<L: Copy + Into<Loc8>>(loc: L) -> Self {
Self {
loc: loc.into(),
initial_arc: DhtArcRange::Full,
}
}
pub fn from_bounds<L: Into<Loc8>>(lo: L, hi: L) -> Self {
let lo: Loc8 = lo.into();
let hi: Loc8 = hi.into();
let initial_arc = DhtArcRange::Bounded(lo, hi);
let loc8 = lo;
Self {
loc: loc8,
initial_arc,
}
}
pub fn from_start_and_len<L: Into<Loc8>>(topo: &Topology, start: L, len: u8) -> Self {
let start: Loc8 = start.into();
let initial_arc = if len == 0 {
DhtArcRange::Empty
} else {
let end = Loc8::from((start.as_u8().wrapping_add(len)) as i32);
DhtArcRange::Bounded(start, end)
};
{
let canonical = initial_arc.canonical();
let (power, _count) = power_and_count_from_length(&topo.space, canonical.length(), 16);
ArqBounds::from_interval(topo, power, canonical)
.unwrap_or_else(|| panic!("Arc is not quantizable. Power is {}", power));
}
Self {
loc: start,
initial_arc,
}
}
}
#[allow(clippy::type_complexity)]
pub struct SpaceEntry {
state: Share<SwitchboardState>,
tasks: Vec<(
tokio::task::JoinHandle<GhostResult<()>>,
tokio::task::JoinHandle<KitsuneResult<()>>,
)>,
}
#[derive(Debug)]
pub struct NodeEntry {
pub(super) local_agents: HashMap<Loc8, AgentEntry>,
pub(super) remote_agents: HashMap<Loc8, AgentInfoSigned>,
pub(super) ops: HashMap<Loc8, NodeOpEntry>,
pub(super) gossip: Arc<ShardedGossip>,
}
impl NodeEntry {
pub(super) fn local_agent_by_loc8(&self, loc8: Loc8) -> Option<&AgentEntry> {
self.local_agents.get(&loc8)
}
pub(super) fn local_agent_by_loc8_mut(&mut self, loc8: Loc8) -> Option<&mut AgentEntry> {
self.local_agents.get_mut(&loc8)
}
pub(super) fn local_agent_by_hash(&self, hash: &KitsuneAgent) -> Option<&AgentEntry> {
self.local_agent_by_loc8(hash.get_loc().as_loc8())
}
pub(super) fn local_agent_by_hash_mut(
&mut self,
hash: &KitsuneAgent,
) -> Option<&mut AgentEntry> {
self.local_agent_by_loc8_mut(hash.get_loc().as_loc8())
}
pub(super) fn all_agent_infos(&self) -> HashSet<AgentInfoSigned> {
self.local_agents
.values()
.map(|a| a.info.clone())
.chain(self.remote_agents.values().cloned())
.collect()
}
pub(super) fn all_agent_locs(&self) -> HashSet<Loc8> {
self.all_agent_infos()
.into_iter()
.map(|info| info.to_agent_arc().0.get_loc().as_loc8())
.collect()
}
}
#[derive(Debug, Clone)]
pub struct AgentEntry {
pub info: AgentInfoSigned,
}
impl AgentEntry {
pub fn new(info: AgentInfoSigned) -> Self {
Self { info }
}
}
#[derive(Debug, Clone)]
pub struct NodeOpEntry {
pub is_integrated: bool,
}
#[derive(Debug, Clone)]
pub struct OpEntry {
pub hash: KOpHash,
pub timestamp: Timestamp,
}
fn agent_from_loc<L: Into<DhtLocation>>(loc8: L) -> KAgent {
let loc: DhtLocation = loc8.into();
Arc::new(KitsuneAgent::new(loc.to_representative_test_bytes_36()))
}
fn op_hash_from_loc<L: Into<DhtLocation>>(loc8: L) -> KOpHash {
let loc: DhtLocation = loc8.into();
Arc::new(KitsuneOpHash::new(loc.to_representative_test_bytes_36()))
}
fn fake_agent_info(
space: KSpace,
node: &NodeEp,
agent: KAgent,
interval: DhtArc,
) -> AgentInfoSigned {
use crate::fixt::*;
let url_list = vec![node.local_addr().unwrap()];
let meta_info = AgentMetaInfoEncode {
dht_storage_arc_half_length: 0,
};
let mut buf = Vec::new();
kitsune_p2p_types::codec::rmp_encode(&mut buf, meta_info).unwrap();
let meta_info = buf.into_boxed_slice();
let info = AgentInfoEncode {
space: space.clone(),
agent: agent.clone(),
urls: url_list.clone(),
signed_at_ms: 0,
expires_after_ms: u64::MAX,
meta_info,
};
let mut buf = Vec::new();
kitsune_p2p_types::codec::rmp_encode(&mut buf, info).unwrap();
let encoded_bytes = buf.into_boxed_slice();
let state = AgentInfoInner {
space,
agent,
storage_arc: interval,
url_list,
signed_at_ms: 0,
expires_at_ms: u64::MAX,
signature: Arc::new(fixt::prelude::fixt!(KitsuneSignature)),
encoded_bytes,
};
AgentInfoSigned(Arc::new(state))
}
#[test]
fn hash_from_loc8_roundtrip() {
for i in [0, 1, -1, -128, 127] {
let i: Loc8 = i.into();
assert_eq!(agent_from_loc(i).get_loc().as_loc8(), i);
assert_eq!(op_hash_from_loc(i).get_loc().as_loc8(), i);
}
for i in [0, 1, 254, 255, 127, 128] {
let i: Loc8 = i.into();
assert_eq!(agent_from_loc(i).get_loc().as_loc8(), i);
assert_eq!(op_hash_from_loc(i).get_loc().as_loc8(), i);
}
}