use std::collections::HashSet;
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
use iroh_base::EndpointId;
use ed25519_dalek::SigningKey;
use crate::channel::{NetCommand, NetEvent};
use crate::identity::iroh_secret;
use crate::protocol::*;
pub struct PeerConfig {
pub peers: Vec<EndpointId>,
pub gossip_topic: Option<String>,
}
impl Default for PeerConfig {
fn default() -> Self {
Self {
peers: Vec::new(),
gossip_topic: None,
}
}
}
pub struct StoreSnapshot<R> {
pub reader: R,
pub branches: Vec<(RawBranchId, RawHash)>,
}
impl StoreSnapshot<()> {
pub fn from_store<S>(store: &mut S) -> Option<StoreSnapshot<S::Reader>>
where
S: triblespace_core::repo::BlobStore<triblespace_core::value::schemas::hash::Blake3>
+ triblespace_core::repo::BranchStore<triblespace_core::value::schemas::hash::Blake3>,
{
let ids: Vec<triblespace_core::id::Id> = store.branches().ok()?
.filter_map(|r| r.ok())
.collect();
let mut branches = Vec::new();
for id in ids {
if let Ok(Some(head)) = store.head(id) {
let id_bytes: [u8; 16] = id.into();
branches.push((id_bytes, head.raw));
}
}
let reader = store.reader().ok()?;
Some(StoreSnapshot { reader, branches })
}
}
pub trait AnySnapshot: Send + 'static {
fn get_blob(&self, hash: &RawHash) -> Option<Vec<u8>>;
fn has_blob(&self, hash: &RawHash) -> bool;
fn list_branches(&self) -> &[(RawBranchId, RawHash)];
fn head(&self, branch: &RawBranchId) -> Option<RawHash>;
}
impl<R> AnySnapshot for StoreSnapshot<R>
where
R: triblespace_core::repo::BlobStoreGet<triblespace_core::value::schemas::hash::Blake3>
+ Send + 'static,
{
fn get_blob(&self, hash: &RawHash) -> Option<Vec<u8>> {
use triblespace_core::blob::schemas::UnknownBlob;
use triblespace_core::value::Value;
use triblespace_core::value::schemas::hash::{Blake3, Handle};
let handle = Value::<Handle<Blake3, UnknownBlob>>::new(*hash);
self.reader.get::<anybytes::Bytes, UnknownBlob>(handle).ok().map(|b| b.to_vec())
}
fn has_blob(&self, hash: &RawHash) -> bool {
self.get_blob(hash).is_some()
}
fn list_branches(&self) -> &[(RawBranchId, RawHash)] {
&self.branches
}
fn head(&self, branch: &RawBranchId) -> Option<RawHash> {
self.branches.iter().find(|(b, _)| b == branch).map(|(_, h)| *h)
}
}
#[derive(Clone)]
pub struct NetSender {
cmd_tx: mpsc::Sender<NetCommand>,
snapshot: Arc<Mutex<Option<Box<dyn AnySnapshot>>>>,
id: EndpointId,
}
impl NetSender {
pub fn id(&self) -> EndpointId { self.id }
pub fn announce(&self, hash: RawHash) {
let _ = self.cmd_tx.send(NetCommand::Announce(hash));
}
pub fn gossip(&self, branch: RawBranchId, head: RawHash) {
let _ = self.cmd_tx.send(NetCommand::Gossip { branch, head });
}
pub fn track(&self, peer: EndpointId, branch: RawBranchId) {
let _ = self.cmd_tx.send(NetCommand::Track { peer, branch });
}
pub fn list_remote_branches(
&self,
peer: EndpointId,
) -> anyhow::Result<Vec<(triblespace_core::id::Id, RawHash)>> {
let (tx, rx) = mpsc::channel();
self.cmd_tx
.send(NetCommand::ListBranches { peer, reply: tx })
.map_err(|_| anyhow::anyhow!("network thread dropped"))?;
rx.recv().map_err(|_| anyhow::anyhow!("network thread dropped"))?
}
pub fn head_of_remote(
&self,
peer: EndpointId,
branch: RawBranchId,
) -> anyhow::Result<Option<RawHash>> {
let (tx, rx) = mpsc::channel();
self.cmd_tx
.send(NetCommand::HeadOfRemote { peer, branch, reply: tx })
.map_err(|_| anyhow::anyhow!("network thread dropped"))?;
rx.recv().map_err(|_| anyhow::anyhow!("network thread dropped"))?
}
pub fn fetch(
&self,
peer: EndpointId,
hash: RawHash,
) -> anyhow::Result<Option<Vec<u8>>> {
let (tx, rx) = mpsc::channel();
self.cmd_tx
.send(NetCommand::Fetch { peer, hash, reply: tx })
.map_err(|_| anyhow::anyhow!("network thread dropped"))?;
rx.recv().map_err(|_| anyhow::anyhow!("network thread dropped"))?
}
pub fn update_snapshot(&self, snapshot: impl AnySnapshot) {
*self.snapshot.lock().unwrap() = Some(Box::new(snapshot));
}
}
pub struct NetReceiver {
evt_rx: mpsc::Receiver<NetEvent>,
}
impl NetReceiver {
pub fn try_recv(&self) -> Option<NetEvent> {
self.evt_rx.try_recv().ok()
}
}
pub fn spawn(key: SigningKey, config: PeerConfig) -> (NetSender, NetReceiver) {
let secret = iroh_secret(&key);
let id: EndpointId = secret.public().into();
let (cmd_tx, cmd_rx) = mpsc::channel::<NetCommand>();
let (evt_tx, evt_rx) = mpsc::channel::<NetEvent>();
let snapshot: Arc<Mutex<Option<Box<dyn AnySnapshot>>>> =
Arc::new(Mutex::new(None));
let thread_snapshot = snapshot.clone();
let _thread = thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("tokio runtime");
rt.block_on(host_loop(secret, config, cmd_rx, evt_tx, thread_snapshot));
});
let sender = NetSender { cmd_tx, snapshot, id };
let receiver = NetReceiver { evt_rx };
(sender, receiver)
}
async fn host_loop(
secret: iroh_base::SecretKey,
config: PeerConfig,
commands: mpsc::Receiver<NetCommand>,
events: mpsc::Sender<NetEvent>,
snapshot: Arc<Mutex<Option<Box<dyn AnySnapshot>>>>,
) {
use iroh::endpoint::presets;
use iroh::protocol::Router;
use iroh::Endpoint;
use iroh_gossip::Gossip;
use iroh_gossip::api::GossipSender;
use futures::TryStreamExt;
let ep = match Endpoint::builder(presets::N0).secret_key(secret).bind().await {
Ok(ep) => ep,
Err(e) => { eprintln!("[net] bind failed: {e}"); return; }
};
ep.online().await;
let my_id = ep.id();
let mut router_builder = Router::builder(ep.clone());
let handler = SnapshotHandler { snapshot: snapshot.clone() };
router_builder = router_builder.accept(PILE_SYNC_ALPN, handler);
let dht_alpn = crate::dht::rpc::ALPN;
let pool = iroh_blobs::util::connection_pool::ConnectionPool::new(
ep.clone(), dht_alpn,
iroh_blobs::util::connection_pool::Options {
max_connections: 64,
idle_timeout: std::time::Duration::from_secs(30),
connect_timeout: std::time::Duration::from_secs(10),
on_connected: None,
},
);
let iroh_pool = crate::dht::pool::IrohPool::new(ep.clone(), pool);
let (rpc, dht_api) = crate::dht::create_node(
my_id, iroh_pool.clone(), config.peers.clone(), Default::default(),
);
iroh_pool.set_self_client(Some(rpc.downgrade()));
let dht_sender = rpc.inner().as_local().expect("local sender");
router_builder = router_builder
.accept(dht_alpn, irpc_iroh::IrohProtocol::with_sender(dht_sender));
let dht_api = Some(dht_api);
let mut gossip_sender: Option<GossipSender> = None;
if let Some(topic_name) = config.gossip_topic {
let gossip = Gossip::builder().spawn(ep.clone());
router_builder = router_builder.accept(iroh_gossip::ALPN, gossip.clone());
let topic_id = iroh_gossip::TopicId::from_bytes(
*blake3::hash(topic_name.as_bytes()).as_bytes()
);
let topic = gossip.subscribe(topic_id, config.peers.clone()).await;
if let Ok(topic) = topic {
let (sender, receiver) = topic.split();
gossip_sender = Some(sender);
let events_tx = events.clone();
let ep2 = ep.clone();
let dht_api2 = dht_api.clone();
tokio::spawn(async move {
let mut receiver = receiver;
while let Ok(Some(event)) = receiver.try_next().await {
match &event {
iroh_gossip::api::Event::Received(msg) => {
if msg.content.len() == 81 && msg.content[0] == 0x01 {
let mut branch = [0u8; 16];
branch.copy_from_slice(&msg.content[1..17]);
let mut head = [0u8; 32];
head.copy_from_slice(&msg.content[17..49]);
let mut publisher = [0u8; 32];
publisher.copy_from_slice(&msg.content[49..81]);
let ep2 = ep2.clone();
let events_tx2 = events_tx.clone();
let dht2 = dht_api2.clone();
let fetch_peer = if let Ok(pk) = iroh_base::PublicKey::from_bytes(&publisher) {
pk.into()
} else {
msg.delivered_from.into()
};
tokio::spawn(async move {
eprintln!("[net] fetching HEAD {} from publisher {}", hex::encode(&head[..4]), hex::encode(&publisher[..4]));
track_known_head(&ep2, fetch_peer, branch, head, publisher, &dht2, &events_tx2).await;
});
}
}
iroh_gossip::api::Event::NeighborUp(peer) => {
eprintln!("[net] gossip neighbor up: {}", peer.fmt_short());
}
iroh_gossip::api::Event::NeighborDown(peer) => {
eprintln!("[net] gossip neighbor down: {}", peer.fmt_short());
}
_ => {}
}
}
});
}
}
let _router = router_builder.spawn();
loop {
while let Ok(cmd) = commands.try_recv() {
match cmd {
NetCommand::Announce(hash) => {
if let Some(api) = &dht_api {
let api = api.clone();
tokio::spawn(async move {
let blake3_hash = blake3::Hash::from_bytes(hash);
let _ = api.announce_provider(blake3_hash, my_id).await;
});
}
}
NetCommand::Gossip { branch, head } => {
if let Some(sender) = &gossip_sender {
let mut msg = Vec::with_capacity(81);
msg.push(0x01);
msg.extend_from_slice(&branch);
msg.extend_from_slice(&head);
msg.extend_from_slice(my_id.as_bytes());
let sender = sender.clone();
tokio::spawn(async move {
let _ = sender.broadcast(msg.into()).await;
});
}
}
NetCommand::Track { peer, branch } => {
let ep = ep.clone();
let events_tx = events.clone();
let dht = dht_api.clone();
tokio::spawn(async move {
let conn = match ep.connect(peer, PILE_SYNC_ALPN).await {
Ok(c) => c,
Err(e) => { eprintln!("[net] connect: {e}"); return; }
};
let head = match op_head(&conn, &branch).await {
Ok(Some(h)) => h,
Ok(None) => { eprintln!("[net] no head"); return; }
Err(e) => { eprintln!("[net] head: {e}"); return; }
};
conn.close(0u32.into(), b"ok");
let mut publisher = [0u8; 32];
publisher.copy_from_slice(peer.as_bytes());
track_known_head(&ep, peer, branch, head, publisher, &dht, &events_tx).await;
});
}
NetCommand::ListBranches { peer, reply } => {
let ep = ep.clone();
tokio::spawn(async move {
let result = async {
let conn = ep.connect(peer, PILE_SYNC_ALPN).await
.map_err(|e| anyhow::anyhow!("connect: {e}"))?;
let pairs = op_list(&conn).await?;
conn.close(0u32.into(), b"ok");
let out: Vec<(triblespace_core::id::Id, RawHash)> = pairs
.into_iter()
.filter_map(|(bid, head)| {
triblespace_core::id::Id::new(bid).map(|id| (id, head))
})
.collect();
Ok(out)
}.await;
let _ = reply.send(result);
});
}
NetCommand::HeadOfRemote { peer, branch, reply } => {
let ep = ep.clone();
tokio::spawn(async move {
let result = async {
let conn = ep.connect(peer, PILE_SYNC_ALPN).await
.map_err(|e| anyhow::anyhow!("connect: {e}"))?;
let head = op_head(&conn, &branch).await?;
conn.close(0u32.into(), b"ok");
Ok(head)
}.await;
let _ = reply.send(result);
});
}
NetCommand::Fetch { peer, hash, reply } => {
let ep = ep.clone();
let dht = dht_api.clone();
tokio::spawn(async move {
let result = fetch_blob(&ep, &hash, &dht, peer).await;
let _ = reply.send(result);
});
}
}
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
async fn fetch_blob(
ep: &iroh::Endpoint,
hash: &RawHash,
dht: &Option<crate::dht::api::ApiClient>,
hint_peer: EndpointId,
) -> anyhow::Result<Option<Vec<u8>>> {
let verify = |data: &[u8]| -> bool {
let computed = blake3::hash(data);
computed.as_bytes() == hash
};
if let Some(api) = dht {
let blake3_hash = blake3::Hash::from_bytes(*hash);
if let Ok(providers) = api.find_providers(blake3_hash).await {
for provider in providers {
if let Ok(conn) = ep.connect(provider, PILE_SYNC_ALPN).await {
if let Ok(Some(data)) = op_get_blob(&conn, hash).await {
conn.close(0u32.into(), b"ok");
if verify(&data) {
return Ok(Some(data));
}
eprintln!("[net] hash mismatch from DHT provider {}", provider.fmt_short());
}
}
}
}
}
if let Ok(conn) = ep.connect(hint_peer, PILE_SYNC_ALPN).await {
if let Ok(Some(data)) = op_get_blob(&conn, hash).await {
conn.close(0u32.into(), b"ok");
if verify(&data) {
return Ok(Some(data));
}
eprintln!("[net] hash mismatch from hint peer {}", hint_peer.fmt_short());
}
}
Ok(None)
}
async fn fetch_reachable(
ep: &iroh::Endpoint,
peer: EndpointId,
head: &RawHash,
dht: &Option<crate::dht::api::ApiClient>,
events: &mpsc::Sender<NetEvent>,
) -> anyhow::Result<()> {
let mut seen: HashSet<RawHash> = HashSet::new();
seen.insert(*head);
if let Some(data) = fetch_blob(ep, head, dht, peer).await? {
let _ = events.send(NetEvent::Blob(data));
}
let mut current_level = vec![*head];
while !current_level.is_empty() {
let mut next_level = Vec::new();
for parent in ¤t_level {
let conn = ep.connect(peer, PILE_SYNC_ALPN).await
.map_err(|e| anyhow::anyhow!("connect: {e}"))?;
let children = op_children(&conn, parent).await?;
conn.close(0u32.into(), b"ok");
for hash in children {
if !seen.insert(hash) { continue; }
if let Some(data) = fetch_blob(ep, &hash, dht, peer).await? {
let _ = events.send(NetEvent::Blob(data));
next_level.push(hash);
}
}
}
current_level = next_level;
}
Ok(())
}
async fn track_known_head(
ep: &iroh::Endpoint,
fetch_peer: EndpointId,
branch: RawBranchId,
head: RawHash,
publisher: crate::channel::PublisherKey,
dht: &Option<crate::dht::api::ApiClient>,
events: &mpsc::Sender<NetEvent>,
) {
if let Err(e) = fetch_reachable(ep, fetch_peer, &head, dht, events).await {
eprintln!("[net] fetch error: {e}");
} else {
let _ = events.send(NetEvent::Head { branch, head, publisher });
}
}
#[derive(Clone)]
struct SnapshotHandler {
snapshot: Arc<Mutex<Option<Box<dyn AnySnapshot>>>>,
}
impl std::fmt::Debug for SnapshotHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SnapshotHandler").finish()
}
}
impl iroh::protocol::ProtocolHandler for SnapshotHandler {
async fn accept(&self, connection: iroh::endpoint::Connection) -> Result<(), iroh::protocol::AcceptError> {
let snap = self.snapshot.clone();
loop {
let (mut send, mut recv) = match connection.accept_bi().await {
Ok(pair) => pair,
Err(_) => break,
};
let snap = snap.clone();
tokio::spawn(async move {
if let Err(e) = serve_from_snapshot(&snap, &mut send, &mut recv).await {
eprintln!("handler error: {e}");
}
let _ = send.finish();
});
}
Ok(())
}
}
async fn serve_from_snapshot(
snap_arc: &Arc<Mutex<Option<Box<dyn AnySnapshot>>>>,
send: &mut iroh::endpoint::SendStream,
recv: &mut iroh::endpoint::RecvStream,
) -> anyhow::Result<()> {
let op = recv_u8(recv).await?;
match op {
OP_LIST => {
let branches = snap_arc.lock().unwrap().as_ref()
.map(|s| s.list_branches().to_vec())
.unwrap_or_default();
for (id, head) in &branches {
send_branch_id(send, id).await?;
send_hash(send, head).await?;
}
send_branch_id(send, &NIL_BRANCH_ID).await?;
}
OP_HEAD => {
let id_bytes = recv_branch_id(recv).await?;
let hash = snap_arc.lock().unwrap().as_ref()
.and_then(|s| s.head(&id_bytes))
.unwrap_or(NIL_HASH);
send_hash(send, &hash).await?;
}
OP_GET_BLOB => {
let hash = recv_hash(recv).await?;
let data = snap_arc.lock().unwrap().as_ref()
.and_then(|s| s.get_blob(&hash));
match data {
Some(data) => {
send_u64_be(send, data.len() as u64).await?;
send.write_all(&data).await.map_err(|e| anyhow::anyhow!("send: {e}"))?;
}
None => send_u64_be(send, u64::MAX).await?,
}
}
OP_CHILDREN => {
let parent_hash = recv_hash(recv).await?;
let children: Vec<RawHash> = {
let guard = snap_arc.lock().unwrap();
match guard.as_ref() {
None => Vec::new(),
Some(snap) => {
match snap.get_blob(&parent_hash) {
None => Vec::new(),
Some(parent_data) => {
let mut result = Vec::new();
for chunk in parent_data.chunks(32) {
if chunk.len() == 32 {
let mut candidate = [0u8; 32];
candidate.copy_from_slice(chunk);
if snap.has_blob(&candidate) {
result.push(candidate);
}
}
}
result
}
}
}
}
};
for hash in &children {
send_hash(send, hash).await?;
}
send_hash(send, &NIL_HASH).await?;
}
_ => {}
}
Ok(())
}