use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
use iroh_base::{EndpointAddr, EndpointId};
use ed25519_dalek::SigningKey;
use tracing::{debug, debug_span, error, info, info_span, instrument, warn, Instrument};
use crate::channel::{NetCommand, NetEvent};
use crate::identity::iroh_secret;
use crate::protocol::*;
fn op_name(op: u8) -> &'static str {
match op {
OP_AUTH => "AUTH",
OP_GET_BLOB => "GET_BLOB",
OP_CHILDREN => "CHILDREN",
_ => "UNKNOWN",
}
}
fn dot_stripped_default_relay_map() -> iroh::RelayMap {
let original = iroh::defaults::prod::default_relay_map();
let stripped_urls: Vec<String> = original
.urls::<Vec<_>>()
.into_iter()
.map(|relay_url| {
let mut url: url::Url = relay_url.into();
if let Some(host) = url.host_str() {
if let Some(trimmed) = host.strip_suffix('.') {
let trimmed = trimmed.to_string();
let _ = url.set_host(Some(&trimmed));
}
}
url.to_string()
})
.collect();
iroh::RelayMap::try_from_iter(stripped_urls.iter().map(|s| s.as_str()))
.expect("stripped relay URLs are valid (transformed from valid input)")
}
pub fn dot_stripped_endpoint_addr(mut addr: EndpointAddr) -> EndpointAddr {
use iroh_base::TransportAddr;
addr.addrs = addr
.addrs
.into_iter()
.map(|t| match t {
TransportAddr::Relay(relay_url) => {
let mut url: url::Url = relay_url.clone().into();
if let Some(host) = url.host_str() {
if let Some(trimmed) = host.strip_suffix('.') {
let trimmed = trimmed.to_string();
let _ = url.set_host(Some(&trimmed));
}
}
TransportAddr::Relay(iroh_base::RelayUrl::from(url))
}
other => other,
})
.collect();
addr
}
pub struct PeerConfig {
pub peers: Vec<EndpointAddr>,
pub gossip: bool,
pub team_root: ed25519_dalek::VerifyingKey,
pub revoked: std::collections::HashSet<ed25519_dalek::VerifyingKey>,
pub self_cap: RawHash,
pub direction: SyncDirection,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum SyncDirection {
#[default]
Bidirectional,
ReadOnly,
WriteOnly,
}
pub struct StoreSnapshot<R> {
pub reader: R,
pub branches: Vec<(RawBranchId, RawHash)>,
}
impl StoreSnapshot<()> {
pub fn from_store<S>(store: &mut S) -> Option<StoreSnapshot<S::Reader>>
where
S: triblespace_core::repo::BlobStore
+ triblespace_core::repo::BranchStore,
{
let ids: Vec<triblespace_core::id::Id> = store.branches().ok()?
.filter_map(|r| r.ok())
.collect();
let mut branches = Vec::new();
for id in ids {
if let Ok(Some(head)) = store.head(id) {
let id_bytes: [u8; 16] = id.into();
branches.push((id_bytes, head.raw));
}
}
let reader = store.reader().ok()?;
Some(StoreSnapshot { reader, branches })
}
}
pub trait AnySnapshot: Send + 'static {
fn get_blob(&self, hash: &RawHash) -> Option<Vec<u8>>;
fn has_blob(&self, hash: &RawHash) -> bool;
fn list_branches(&self) -> &[(RawBranchId, RawHash)];
fn all_simple_archive_blobs(
&self,
) -> Vec<triblespace_core::blob::Blob<
triblespace_core::blob::encodings::simplearchive::SimpleArchive,
>>;
}
impl<R> AnySnapshot for StoreSnapshot<R>
where
R: triblespace_core::repo::BlobStoreGet
+ triblespace_core::repo::BlobStoreList
+ Send + 'static,
{
fn get_blob(&self, hash: &RawHash) -> Option<Vec<u8>> {
use triblespace_core::blob::encodings::UnknownBlob;
use triblespace_core::inline::Inline;
use triblespace_core::inline::encodings::hash::Handle;
let handle = Inline::<Handle<UnknownBlob>>::new(*hash);
self.reader.get::<anybytes::Bytes, UnknownBlob>(handle).ok().map(|b| b.to_vec())
}
fn has_blob(&self, hash: &RawHash) -> bool {
self.get_blob(hash).is_some()
}
fn list_branches(&self) -> &[(RawBranchId, RawHash)] {
&self.branches
}
fn all_simple_archive_blobs(
&self,
) -> Vec<triblespace_core::blob::Blob<
triblespace_core::blob::encodings::simplearchive::SimpleArchive,
>> {
use triblespace_core::blob::Blob;
use triblespace_core::blob::encodings::simplearchive::SimpleArchive;
use triblespace_core::inline::Inline;
use triblespace_core::inline::encodings::hash::Handle;
let mut out = Vec::new();
for handle_result in self.reader.blobs() {
let Ok(handle) = handle_result else { continue };
let typed: Inline<Handle<SimpleArchive>> = Inline::new(handle.raw);
if let Ok(blob) = self.reader.get::<Blob<SimpleArchive>, SimpleArchive>(typed) {
out.push(blob);
}
}
out
}
}
#[derive(Clone)]
pub struct NetSender {
cmd_tx: mpsc::Sender<NetCommand>,
snapshot: Arc<Mutex<Option<Box<dyn AnySnapshot>>>>,
revoked: Arc<std::sync::RwLock<HashSet<ed25519_dalek::VerifyingKey>>>,
team_root: ed25519_dalek::VerifyingKey,
id: EndpointId,
}
impl NetSender {
pub fn id(&self) -> EndpointId { self.id }
pub fn announce(&self, hash: RawHash) {
let _ = self.cmd_tx.send(NetCommand::Announce(hash));
}
pub fn gossip(&self, branch: RawBranchId, head: RawHash) {
let _ = self.cmd_tx.send(NetCommand::Gossip { branch, head });
}
pub fn update_snapshot(&self, snapshot: impl AnySnapshot) {
let boxed: Box<dyn AnySnapshot> = Box::new(snapshot);
let mut authorised: HashSet<ed25519_dalek::VerifyingKey> =
HashSet::new();
authorised.insert(self.team_root);
let pairs = triblespace_core::repo::capability::extract_revocation_pairs(
boxed.all_simple_archive_blobs(),
);
let scanned: HashSet<ed25519_dalek::VerifyingKey> =
triblespace_core::repo::capability::build_revocation_set(
&authorised, pairs,
);
if !scanned.is_empty() {
let mut guard = self.revoked.write().unwrap();
for k in scanned {
guard.insert(k);
}
}
*self.snapshot.lock().unwrap() = Some(boxed);
}
}
pub struct NetReceiver {
evt_rx: mpsc::Receiver<NetEvent>,
}
impl NetReceiver {
pub fn try_recv(&self) -> Option<NetEvent> {
self.evt_rx.try_recv().ok()
}
}
pub fn spawn(key: SigningKey, config: PeerConfig) -> (NetSender, NetReceiver) {
let secret = iroh_secret(&key);
let id: EndpointId = secret.public().into();
let (cmd_tx, cmd_rx) = mpsc::channel::<NetCommand>();
let (evt_tx, evt_rx) = mpsc::channel::<NetEvent>();
let snapshot: Arc<Mutex<Option<Box<dyn AnySnapshot>>>> =
Arc::new(Mutex::new(None));
let revoked: Arc<std::sync::RwLock<HashSet<ed25519_dalek::VerifyingKey>>> =
Arc::new(std::sync::RwLock::new(config.revoked.clone()));
let team_root = config.team_root;
let thread_snapshot = snapshot.clone();
let thread_revoked = revoked.clone();
let _thread = thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("tokio runtime");
rt.block_on(host_loop(
secret,
config,
cmd_rx,
evt_tx,
thread_snapshot,
thread_revoked,
));
});
let sender = NetSender {
cmd_tx,
snapshot,
revoked,
team_root,
id,
};
let receiver = NetReceiver { evt_rx };
(sender, receiver)
}
#[instrument(level = "info", skip(ep, self_cap), fields(peer = %peer.id.fmt_short()))]
async fn connect_authed(
ep: &iroh::Endpoint,
peer: EndpointAddr,
self_cap: &RawHash,
) -> anyhow::Result<iroh::endpoint::Connection> {
debug!(alpn = %String::from_utf8_lossy(PILE_SYNC_ALPN), "connecting");
let conn = ep.connect(peer, PILE_SYNC_ALPN).await
.map_err(|e| {
warn!(error = %e, "connect failed");
anyhow::anyhow!("connect: {e}")
})?;
debug!(self_cap = %hex::encode(&self_cap[..4]), "connected; sending OP_AUTH");
op_auth(&conn, self_cap).await
.map_err(|e| {
warn!(error = %e, "auth handshake failed");
anyhow::anyhow!("auth: {e}")
})?;
info!("auth ok");
Ok(conn)
}
#[allow(clippy::too_many_arguments)]
async fn host_loop(
secret: iroh_base::SecretKey,
config: PeerConfig,
commands: mpsc::Receiver<NetCommand>,
events: mpsc::Sender<NetEvent>,
snapshot: Arc<Mutex<Option<Box<dyn AnySnapshot>>>>,
revoked: Arc<std::sync::RwLock<HashSet<ed25519_dalek::VerifyingKey>>>,
) {
use iroh::endpoint::presets;
use iroh::protocol::Router;
use iroh::Endpoint;
use iroh_gossip::Gossip;
use iroh_gossip::api::GossipSender;
use futures::TryStreamExt;
let static_lookup =
crate::address_lookup::StaticAddressLookup::new(config.peers.iter().cloned());
let relay_map = dot_stripped_default_relay_map();
let ep = match Endpoint::builder(presets::N0)
.secret_key(secret)
.ca_roots_config(iroh::tls::CaRootsConfig::system())
.address_lookup(static_lookup)
.relay_mode(iroh::RelayMode::Custom(relay_map))
.bind()
.await
{
Ok(ep) => ep,
Err(e) => { error!(error = %e, "iroh endpoint bind failed; net thread exiting"); return; }
};
ep.online().await;
{
use iroh_tickets::endpoint::EndpointTicket;
let local_addr = dot_stripped_endpoint_addr(ep.addr());
let ticket: EndpointTicket = local_addr.into();
eprintln!("ticket: {ticket}");
}
let my_id = ep.id();
let self_cap: RawHash = config.self_cap;
let mut router_builder = Router::builder(ep.clone());
let handler = SnapshotHandler {
snapshot: snapshot.clone(),
team_root: config.team_root,
revoked: revoked.clone(),
};
router_builder = router_builder.accept(PILE_SYNC_ALPN, handler);
let dht_alpn = crate::dht::rpc::ALPN;
let pool = iroh_blobs::util::connection_pool::ConnectionPool::new(
ep.clone(), dht_alpn,
iroh_blobs::util::connection_pool::Options {
max_connections: 64,
idle_timeout: std::time::Duration::from_secs(30),
connect_timeout: std::time::Duration::from_secs(10),
on_connected: None,
},
);
let iroh_pool = crate::dht::pool::IrohPool::new(ep.clone(), pool);
let bootstrap_ids: Vec<EndpointId> =
config.peers.iter().map(|addr| addr.id).collect();
let (rpc, dht_api) = crate::dht::create_node(
my_id, iroh_pool.clone(), bootstrap_ids.clone(), Default::default(),
);
iroh_pool.set_self_client(Some(rpc.downgrade()));
let dht_sender = rpc.inner().as_local().expect("local sender");
router_builder = router_builder
.accept(dht_alpn, irpc_iroh::IrohProtocol::with_sender(dht_sender));
let dht_api = Some(dht_api);
let mut gossip_sender: Option<GossipSender> = None;
if config.gossip {
let gossip = Gossip::builder().spawn(ep.clone());
router_builder = router_builder.accept(iroh_gossip::ALPN, gossip.clone());
let topic_id = iroh_gossip::TopicId::from_bytes(config.team_root.to_bytes());
let topic = gossip.subscribe(topic_id, bootstrap_ids.clone()).await;
if let Ok(topic) = topic {
let (sender, receiver) = topic.split();
gossip_sender = Some(sender);
let events_tx = events.clone();
let ep2 = ep.clone();
let dht_api2 = dht_api.clone();
let snapshot_for_fetch = snapshot.clone();
tokio::spawn(async move {
let mut receiver = receiver;
while let Ok(Some(event)) = receiver.try_next().await {
match &event {
iroh_gossip::api::Event::Received(msg) => {
if msg.content.len() == 81 && msg.content[0] == 0x01 {
let mut branch = [0u8; 16];
branch.copy_from_slice(&msg.content[1..17]);
let mut head = [0u8; 32];
head.copy_from_slice(&msg.content[17..49]);
let mut publisher = [0u8; 32];
publisher.copy_from_slice(&msg.content[49..81]);
let ep2 = ep2.clone();
let events_tx2 = events_tx.clone();
let dht2 = dht_api2.clone();
let self_cap2 = self_cap;
let snap2 = snapshot_for_fetch.clone();
let fetch_peer = if let Ok(pk) = iroh_base::PublicKey::from_bytes(&publisher) {
pk.into()
} else {
msg.delivered_from.into()
};
tokio::spawn(async move {
debug!(
head = %hex::encode(&head[..4]),
publisher = %hex::encode(&publisher[..4]),
"gossip head update; fetching"
);
track_known_head(&ep2, fetch_peer, branch, head, publisher, &dht2, &events_tx2, &self_cap2, &snap2).await;
});
}
}
iroh_gossip::api::Event::NeighborUp(peer) => {
info!(peer = %peer.fmt_short(), "gossip neighbor up");
}
iroh_gossip::api::Event::NeighborDown(peer) => {
info!(peer = %peer.fmt_short(), "gossip neighbor down");
}
_ => {}
}
}
});
}
}
let _router = router_builder.spawn();
fn gossip_frame(branch: &RawBranchId, head: &RawHash, publisher: &EndpointId) -> Vec<u8> {
let mut msg = Vec::with_capacity(81);
msg.push(0x01);
msg.extend_from_slice(branch);
msg.extend_from_slice(head);
msg.extend_from_slice(publisher.as_bytes());
msg
}
let mut last_published: HashMap<RawBranchId, RawHash> = HashMap::new();
let rebroadcast_period = std::time::Duration::from_secs(30);
let mut last_rebroadcast = std::time::Instant::now();
loop {
while let Ok(cmd) = commands.try_recv() {
match cmd {
NetCommand::Announce(hash) => {
if let Some(api) = &dht_api {
let api = api.clone();
tokio::spawn(async move {
let blake3_hash = blake3::Hash::from_bytes(hash);
let _ = api.announce_provider(blake3_hash, my_id).await;
});
}
}
NetCommand::Gossip { branch, head } => {
last_published.insert(branch, head);
if let Some(sender) = &gossip_sender {
let msg = gossip_frame(&branch, &head, &my_id);
let sender = sender.clone();
tokio::spawn(async move {
let _ = sender.broadcast(msg.into()).await;
});
}
}
}
}
if last_rebroadcast.elapsed() >= rebroadcast_period {
if let Some(sender) = &gossip_sender {
for (branch, head) in &last_published {
let msg = gossip_frame(branch, head, &my_id);
let sender = sender.clone();
tokio::spawn(async move {
let _ = sender.broadcast(msg.into()).await;
});
}
}
last_rebroadcast = std::time::Instant::now();
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
async fn fetch_reachable(
ep: &iroh::Endpoint,
publisher: EndpointAddr,
head: &RawHash,
dht: &Option<crate::dht::api::ApiClient>,
events: &mpsc::Sender<NetEvent>,
self_cap: &RawHash,
local: &Arc<Mutex<Option<Box<dyn AnySnapshot>>>>,
) -> anyhow::Result<()> {
let have_local = |hash: &RawHash| -> bool {
local
.lock()
.unwrap()
.as_ref()
.map(|s| s.has_blob(hash))
.unwrap_or(false)
};
if have_local(head) {
return Ok(());
}
let mut pool: HashMap<EndpointId, iroh::endpoint::Connection> = HashMap::new();
let publisher_id = publisher.id;
if let Ok(conn) = connect_authed(ep, publisher, self_cap).await {
pool.insert(publisher_id, conn);
}
let mut seen: HashSet<RawHash> = HashSet::new();
let mut to_fetch: Vec<RawHash> = Vec::new();
let mut frontier: Vec<RawHash> = vec![*head];
seen.insert(*head);
to_fetch.push(*head);
while !frontier.is_empty() {
let mut next: Vec<RawHash> = Vec::new();
for parent in &frontier {
let children = match children_one(ep, parent, dht, &mut pool, publisher_id, self_cap).await {
Some(c) => c,
None => {
warn!(parent = %hex::encode(&parent[..4]), "op_children: no provider could serve");
continue;
}
};
for hash in children {
if !seen.insert(hash) {
continue;
}
if have_local(&hash) {
continue;
}
to_fetch.push(hash);
next.push(hash);
}
}
frontier = next;
}
for hash in to_fetch.iter().rev() {
let Some(data) = fetch_one(ep, hash, dht, &mut pool, publisher_id, self_cap).await else {
debug!(hash = %hex::encode(&hash[..4]), "blob unavailable from all known providers");
continue;
};
if blake3::hash(&data).as_bytes() != hash {
warn!(hash = %hex::encode(&hash[..4]), "hash mismatch on fetched blob");
continue;
}
let _ = events.send(NetEvent::Blob(data));
}
for (_, conn) in pool.drain() {
conn.close(0u32.into(), b"ok");
}
Ok(())
}
async fn providers_for(
hash: &RawHash,
dht: &Option<crate::dht::api::ApiClient>,
publisher_id: EndpointId,
) -> Vec<EndpointId> {
let mut providers: Vec<EndpointId> = if let Some(api) = dht {
let blake3_hash = blake3::Hash::from_bytes(*hash);
api.find_providers(blake3_hash).await.unwrap_or_default()
} else {
Vec::new()
};
if !providers.contains(&publisher_id) {
providers.push(publisher_id);
}
providers
}
async fn pool_get<'a>(
ep: &iroh::Endpoint,
pool: &'a mut HashMap<EndpointId, iroh::endpoint::Connection>,
provider: EndpointId,
self_cap: &RawHash,
) -> Option<&'a iroh::endpoint::Connection> {
if !pool.contains_key(&provider) {
let addr: EndpointAddr = provider.into();
match connect_authed(ep, addr, self_cap).await {
Ok(conn) => {
pool.insert(provider, conn);
}
Err(e) => {
debug!(error = %e, provider = %provider.fmt_short(), "could not auth alt provider");
return None;
}
}
}
pool.get(&provider)
}
async fn fetch_one(
ep: &iroh::Endpoint,
hash: &RawHash,
dht: &Option<crate::dht::api::ApiClient>,
pool: &mut HashMap<EndpointId, iroh::endpoint::Connection>,
publisher_id: EndpointId,
self_cap: &RawHash,
) -> Option<Vec<u8>> {
let providers = providers_for(hash, dht, publisher_id).await;
for provider in providers {
let Some(conn) = pool_get(ep, pool, provider, self_cap).await else {
continue;
};
match op_get_blob(conn, hash).await {
Ok(Some(data)) => return Some(data),
Ok(None) => {
debug!(hash = %hex::encode(&hash[..4]), provider = %provider.fmt_short(), "blob miss");
continue;
}
Err(e) => {
debug!(error = %e, hash = %hex::encode(&hash[..4]), provider = %provider.fmt_short(), "op_get_blob errored, trying next provider");
continue;
}
}
}
None
}
async fn children_one(
ep: &iroh::Endpoint,
parent: &RawHash,
dht: &Option<crate::dht::api::ApiClient>,
pool: &mut HashMap<EndpointId, iroh::endpoint::Connection>,
publisher_id: EndpointId,
self_cap: &RawHash,
) -> Option<Vec<RawHash>> {
let providers = providers_for(parent, dht, publisher_id).await;
for provider in providers {
let Some(conn) = pool_get(ep, pool, provider, self_cap).await else {
continue;
};
match op_children(conn, parent).await {
Ok(c) => return Some(c),
Err(e) => {
debug!(error = %e, parent = %hex::encode(&parent[..4]), provider = %provider.fmt_short(), "op_children errored, trying next provider");
continue;
}
}
}
None
}
async fn track_known_head(
ep: &iroh::Endpoint,
fetch_peer: EndpointAddr,
branch: RawBranchId,
head: RawHash,
publisher: crate::channel::PublisherKey,
dht: &Option<crate::dht::api::ApiClient>,
events: &mpsc::Sender<NetEvent>,
self_cap: &RawHash,
local: &Arc<Mutex<Option<Box<dyn AnySnapshot>>>>,
) {
let fetch_id = fetch_peer.id;
if let Err(e) = fetch_reachable(ep, fetch_peer, &head, dht, events, self_cap, local).await {
warn!(error = %e, peer = %fetch_id.fmt_short(), "fetch_reachable failed");
} else {
let _ = events.send(NetEvent::Head { branch, head, publisher });
}
}
#[derive(Clone)]
struct SnapshotHandler {
snapshot: Arc<Mutex<Option<Box<dyn AnySnapshot>>>>,
team_root: ed25519_dalek::VerifyingKey,
revoked: Arc<std::sync::RwLock<std::collections::HashSet<ed25519_dalek::VerifyingKey>>>,
}
impl std::fmt::Debug for SnapshotHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SnapshotHandler").finish()
}
}
impl iroh::protocol::ProtocolHandler for SnapshotHandler {
async fn accept(&self, connection: iroh::endpoint::Connection) -> Result<(), iroh::protocol::AcceptError> {
let snap = self.snapshot.clone();
let team_root = self.team_root;
let revoked = self.revoked.clone();
let peer_endpoint = connection.remote_id();
let span = info_span!(
"connection",
peer = %peer_endpoint.fmt_short(),
alpn = %String::from_utf8_lossy(PILE_SYNC_ALPN),
);
async move {
info!("connection accepted");
let peer_pubkey = match ed25519_dalek::VerifyingKey::from_bytes(
peer_endpoint.as_bytes(),
) {
Ok(k) => k,
Err(e) => {
warn!(error = %e, "peer pubkey parse failed; closing");
return;
}
};
let auth_state: Arc<tokio::sync::RwLock<
Option<triblespace_core::repo::capability::VerifiedCapability>,
>> = Arc::new(tokio::sync::RwLock::new(None));
loop {
let (mut send, mut recv) = match connection.accept_bi().await {
Ok(pair) => pair,
Err(e) => {
debug!(error = %e, "accept_bi ended; connection closing");
break;
}
};
let snap = snap.clone();
let auth_state = auth_state.clone();
let revoked = revoked.clone();
tokio::spawn(
async move {
if let Err(e) = serve_stream(
&snap,
team_root,
peer_pubkey,
auth_state,
revoked,
&mut send,
&mut recv,
).await {
error!(error = %e, "stream handler error");
}
let _ = send.finish();
}
.in_current_span(),
);
}
}
.instrument(span)
.await;
Ok(())
}
}
async fn serve_stream(
snap_arc: &Arc<Mutex<Option<Box<dyn AnySnapshot>>>>,
team_root: ed25519_dalek::VerifyingKey,
peer_pubkey: ed25519_dalek::VerifyingKey,
auth_state: Arc<tokio::sync::RwLock<
Option<triblespace_core::repo::capability::VerifiedCapability>,
>>,
revoked: Arc<std::sync::RwLock<std::collections::HashSet<ed25519_dalek::VerifyingKey>>>,
send: &mut iroh::endpoint::SendStream,
recv: &mut iroh::endpoint::RecvStream,
) -> anyhow::Result<()> {
use triblespace_core::blob::Blob;
use triblespace_core::blob::encodings::simplearchive::SimpleArchive;
use triblespace_core::inline::encodings::hash::Handle;
use triblespace_core::inline::Inline;
let op = recv_u8(recv).await?;
let span = debug_span!("stream", op = op_name(op));
let _enter = span.enter();
if op == OP_AUTH {
let cap_handle_raw = recv_hash(recv).await?;
debug!(cap_handle = %hex::encode(&cap_handle_raw[..4]), "auth: cap handle received");
let cap_handle: Inline<Handle<SimpleArchive>> =
Inline::new(cap_handle_raw);
let revoked_snapshot = revoked.read().unwrap().clone();
let snap_for_fetch = snap_arc.clone();
let result = triblespace_core::repo::capability::verify_chain(
team_root,
cap_handle,
peer_pubkey,
&revoked_snapshot,
move |h: Inline<Handle<SimpleArchive>>| -> Option<Blob<SimpleArchive>> {
let bytes = snap_for_fetch
.lock()
.unwrap()
.as_ref()?
.get_blob(&h.raw)?;
Some(Blob::new(anybytes::Bytes::from_source(bytes)))
},
);
match result {
Ok(verified) => {
let granted = verified
.granted_branches()
.map(|s| s.len())
.unwrap_or(0);
let unrestricted = verified.granted_branches().is_none();
info!(branches = granted, unrestricted = unrestricted, "auth ok");
*auth_state.write().await = Some(verified);
send_u8(send, AUTH_OK).await?;
}
Err(e) => {
warn!(error = ?e, "auth rejected");
send_u8(send, AUTH_REJECTED).await?;
}
}
return Ok(());
}
let verified = match auth_state.read().await.clone() {
Some(v) => v,
None => {
debug!("op without prior OP_AUTH on connection; closing stream");
return Ok(());
}
};
match op {
OP_GET_BLOB => {
let hash = recv_hash(recv).await?;
let in_scope_flag;
let data = {
let guard = snap_arc.lock().unwrap();
let scope_ok = guard.as_ref()
.map(|snap| blob_in_scope(snap.as_ref(), &verified, &hash))
.unwrap_or(false);
in_scope_flag = scope_ok;
guard.as_ref().and_then(|snap| {
if !scope_ok { return None; }
snap.get_blob(&hash)
})
};
match data {
Some(data) => {
debug!(hash = %hex::encode(&hash[..4]), bytes = data.len(), "OP_GET_BLOB served");
send_u64_be(send, data.len() as u64).await?;
send.write_all(&data).await.map_err(|e| anyhow::anyhow!("send: {e}"))?;
}
None => {
if !in_scope_flag {
warn!(hash = %hex::encode(&hash[..4]), "OP_GET_BLOB denied: out of scope");
} else {
debug!(hash = %hex::encode(&hash[..4]), "OP_GET_BLOB miss: blob not present");
}
send_u64_be(send, u64::MAX).await?;
}
}
}
OP_CHILDREN => {
let parent_hash = recv_hash(recv).await?;
let mut parent_in_scope = true;
let mut total_chunks = 0usize;
let children: Vec<RawHash> = {
let guard = snap_arc.lock().unwrap();
match guard.as_ref() {
None => Vec::new(),
Some(snap) => {
let reachable = reachable_set_for(
snap.as_ref(),
&verified,
);
let in_scope = |hash: &RawHash| -> bool {
if !snap.has_blob(hash) {
return false;
}
match &reachable {
None => verified.grants_read(),
Some(set) => set.contains(hash),
}
};
if !in_scope(&parent_hash) {
parent_in_scope = false;
Vec::new()
} else {
match snap.get_blob(&parent_hash) {
None => Vec::new(),
Some(parent_data) => {
let mut result = Vec::new();
for chunk in parent_data.chunks(32) {
if chunk.len() == 32 {
total_chunks += 1;
let mut candidate = [0u8; 32];
candidate.copy_from_slice(chunk);
if in_scope(&candidate) {
result.push(candidate);
}
}
}
result
}
}
}
}
}
};
if !parent_in_scope {
warn!(parent = %hex::encode(&parent_hash[..4]), "OP_CHILDREN denied: parent out of scope");
} else {
debug!(
parent = %hex::encode(&parent_hash[..4]),
candidates = total_chunks,
in_scope = children.len(),
"OP_CHILDREN served"
);
}
for hash in &children {
send_hash(send, hash).await?;
}
send_hash(send, &NIL_HASH).await?;
}
_ => {}
}
Ok(())
}
fn reachable_set_for(
snap: &dyn AnySnapshot,
verified: &triblespace_core::repo::capability::VerifiedCapability,
) -> Option<HashSet<RawHash>> {
if verified.granted_branches().is_none() {
return None;
}
let mut frontier: Vec<RawHash> = snap
.list_branches()
.iter()
.filter_map(|(bid, head)| {
triblespace_core::id::Id::new(*bid)
.filter(|id| verified.grants_read_on(id))
.map(|_| *head)
})
.collect();
let mut reachable: HashSet<RawHash> = HashSet::new();
while let Some(h) = frontier.pop() {
if !reachable.insert(h) {
continue;
}
if let Some(data) = snap.get_blob(&h) {
for chunk in data.chunks(32) {
if chunk.len() == 32 {
let mut child = [0u8; 32];
child.copy_from_slice(chunk);
if snap.has_blob(&child) && !reachable.contains(&child) {
frontier.push(child);
}
}
}
}
}
Some(reachable)
}
fn blob_in_scope(
snap: &dyn AnySnapshot,
verified: &triblespace_core::repo::capability::VerifiedCapability,
hash: &RawHash,
) -> bool {
if !snap.has_blob(hash) {
return false;
}
match reachable_set_for(snap, verified) {
None => verified.grants_read(),
Some(set) => set.contains(hash),
}
}
#[cfg(test)]
mod tests {
use super::*;
use ed25519_dalek::SigningKey;
use rand::rngs::OsRng;
use triblespace_core::blob::Blob;
use triblespace_core::blob::encodings::simplearchive::SimpleArchive;
use triblespace_core::id::{ExclusiveId, ufoid};
use triblespace_core::macros::entity;
use triblespace_core::repo::BlobStorePut;
use triblespace_core::repo::capability::{
VerifyError, build_capability, verify_chain, PERM_READ,
};
use triblespace_core::repo::memoryrepo::MemoryRepo;
use triblespace_core::trible::TribleSet;
use triblespace_core::inline::TryToInline;
use triblespace_core::inline::Inline;
use triblespace_core::inline::encodings::hash::Handle;
use triblespace_core::inline::encodings::time::NsTAIInterval;
use hifitime::Epoch;
fn now_plus_24h() -> Inline<NsTAIInterval> {
let now = Epoch::now().expect("system time");
let later = now + hifitime::Duration::from_seconds(24.0 * 3600.0);
(now, later).try_to_inline().expect("valid interval")
}
fn empty_scope() -> (triblespace_core::id::Id, TribleSet) {
let scope_root = ufoid();
let facts = entity! { ExclusiveId::force_ref(&scope_root) @
triblespace_core::metadata::tag: PERM_READ,
};
(*scope_root, TribleSet::from(facts))
}
fn snapshot_with_blobs(
blobs: &[Blob<SimpleArchive>],
) -> Box<dyn AnySnapshot> {
let mut store = MemoryRepo::default();
for blob in blobs {
store
.put::<SimpleArchive, _>(blob.clone())
.expect("put blob");
}
Box::new(StoreSnapshot::from_store(&mut store).expect("snapshot"))
}
fn fetch_via_snapshot(
snap: &Arc<Mutex<Option<Box<dyn AnySnapshot>>>>,
) -> impl FnMut(Inline<Handle<SimpleArchive>>) -> Option<Blob<SimpleArchive>>
{
let snap = snap.clone();
move |h: Inline<Handle<SimpleArchive>>| -> Option<Blob<SimpleArchive>> {
let bytes = snap.lock().unwrap().as_ref()?.get_blob(&h.raw)?;
Some(Blob::new(anybytes::Bytes::from_source(bytes)))
}
}
#[test]
fn snapshot_lookup_serves_a_valid_cap_chain_to_verify_chain() {
let team_root = SigningKey::generate(&mut OsRng);
let founder = SigningKey::generate(&mut OsRng);
let (scope_root, scope_facts) = empty_scope();
let (cap_blob, sig_blob) = build_capability(
&team_root,
founder.verifying_key(),
None,
scope_root,
scope_facts,
now_plus_24h(),
)
.expect("cap builds");
let sig_handle: Inline<Handle<SimpleArchive>> =
(&sig_blob).get_handle();
let snap_box = snapshot_with_blobs(&[cap_blob, sig_blob]);
let snap_arc: Arc<Mutex<Option<Box<dyn AnySnapshot>>>> =
Arc::new(Mutex::new(Some(snap_box)));
let revoked = HashSet::new();
let result = verify_chain(
team_root.verifying_key(),
sig_handle,
founder.verifying_key(),
&revoked,
fetch_via_snapshot(&snap_arc),
);
let verified = result.expect("snapshot served chain to verifier; chain valid");
assert_eq!(verified.subject, founder.verifying_key());
assert_eq!(verified.scope_root, scope_root);
}
#[test]
fn snapshot_lookup_rejects_unknown_handle_as_chain_break() {
let team_root = SigningKey::generate(&mut OsRng);
let founder = SigningKey::generate(&mut OsRng);
let snap_arc: Arc<Mutex<Option<Box<dyn AnySnapshot>>>> =
Arc::new(Mutex::new(Some(snapshot_with_blobs(&[]))));
let zero_handle: Inline<Handle<SimpleArchive>> =
Inline::new([0u8; 32]);
let revoked = HashSet::new();
let result = verify_chain(
team_root.verifying_key(),
zero_handle,
founder.verifying_key(),
&revoked,
fetch_via_snapshot(&snap_arc),
);
assert!(
matches!(result, Err(VerifyError::Fetch)),
"unknown handle must surface as Fetch; got {:?}",
result,
);
}
fn manual_verified_cap(
scope_root: triblespace_core::id::Id,
permissions: &[triblespace_core::id::Id],
branches: &[triblespace_core::id::Id],
) -> triblespace_core::repo::capability::VerifiedCapability {
let mut cap_set = TribleSet::new();
for perm in permissions {
cap_set += TribleSet::from(entity! {
ExclusiveId::force_ref(&scope_root) @
triblespace_core::metadata::tag: *perm,
});
}
for b in branches {
cap_set += TribleSet::from(entity! {
ExclusiveId::force_ref(&scope_root) @
triblespace_core::repo::capability::scope_branch: *b,
});
}
let dummy_subject = SigningKey::generate(&mut OsRng).verifying_key();
triblespace_core::repo::capability::VerifiedCapability {
subject: dummy_subject,
scope_root,
cap_set,
}
}
fn two_branch_snapshot() -> (
Box<dyn AnySnapshot>,
triblespace_core::id::Id,
triblespace_core::id::Id,
RawHash,
RawHash,
RawHash,
RawHash,
) {
use triblespace_core::blob::encodings::UnknownBlob;
use triblespace_core::repo::BranchStore;
let mut store = MemoryRepo::default();
let leaf_a_bytes = anybytes::Bytes::from_source(b"leaf_a".to_vec());
let leaf_a = store.put::<UnknownBlob, _>(leaf_a_bytes).unwrap();
let leaf_b_bytes = anybytes::Bytes::from_source(b"leaf_b".to_vec());
let leaf_b = store.put::<UnknownBlob, _>(leaf_b_bytes).unwrap();
let head_a_bytes = anybytes::Bytes::from_source(leaf_a.raw.to_vec());
let head_a = store.put::<UnknownBlob, _>(head_a_bytes).unwrap();
let head_b_bytes = anybytes::Bytes::from_source(leaf_b.raw.to_vec());
let head_b = store.put::<UnknownBlob, _>(head_b_bytes).unwrap();
let branch_a = ufoid();
let branch_b = ufoid();
let head_a_simple: Inline<Handle<SimpleArchive>> =
Inline::new(head_a.raw);
let head_b_simple: Inline<Handle<SimpleArchive>> =
Inline::new(head_b.raw);
store.update(*branch_a, None, Some(head_a_simple)).unwrap();
store.update(*branch_b, None, Some(head_b_simple)).unwrap();
let snap: Box<dyn AnySnapshot> =
Box::new(StoreSnapshot::from_store(&mut store).expect("snapshot"));
(snap, *branch_a, *branch_b, head_a.raw, leaf_a.raw, head_b.raw, leaf_b.raw)
}
#[test]
fn blob_in_scope_filters_by_branch_reachability() {
let (snap, branch_a, _branch_b, head_a, leaf_a, head_b, leaf_b) =
two_branch_snapshot();
let scope_root = *ufoid();
let verified =
manual_verified_cap(scope_root, &[PERM_READ], &[branch_a]);
assert!(
blob_in_scope(snap.as_ref(), &verified, &head_a),
"head reachable from allowed branch is in scope",
);
assert!(
blob_in_scope(snap.as_ref(), &verified, &leaf_a),
"leaf reachable from allowed branch is in scope",
);
assert!(
!blob_in_scope(snap.as_ref(), &verified, &head_b),
"head of disallowed branch is out of scope",
);
assert!(
!blob_in_scope(snap.as_ref(), &verified, &leaf_b),
"leaf reachable only from disallowed branch is out of scope",
);
}
#[test]
fn blob_in_scope_unrestricted_admits_any_present_blob() {
let (snap, _branch_a, _branch_b, head_a, _leaf_a, head_b, _leaf_b) =
two_branch_snapshot();
let scope_root = *ufoid();
let verified = manual_verified_cap(scope_root, &[PERM_READ], &[]);
assert!(blob_in_scope(snap.as_ref(), &verified, &head_a));
assert!(
blob_in_scope(snap.as_ref(), &verified, &head_b),
"unrestricted cap admits all branches' heads",
);
let absent = [0xFFu8; 32];
assert!(
!blob_in_scope(snap.as_ref(), &verified, &absent),
"blobs absent from the snapshot are never in scope",
);
}
#[test]
fn blob_in_scope_with_no_read_permission_admits_nothing() {
let (snap, branch_a, _branch_b, head_a, _leaf_a, _head_b, _leaf_b) =
two_branch_snapshot();
let scope_root = *ufoid();
let verified = manual_verified_cap(scope_root, &[], &[branch_a]);
assert!(
!blob_in_scope(snap.as_ref(), &verified, &head_a),
"cap without read permission cannot reach any blob, even of \
a notionally-allowed branch",
);
}
#[test]
fn update_snapshot_picks_up_team_root_signed_revocations() {
use std::sync::mpsc as std_mpsc;
use triblespace_core::repo::capability::build_revocation;
let team_root = SigningKey::generate(&mut OsRng);
let target = SigningKey::generate(&mut OsRng);
let (rev_blob, rev_sig_blob) =
build_revocation(&team_root, target.verifying_key());
let (cmd_tx, _cmd_rx) = std_mpsc::channel::<NetCommand>();
let snapshot_arc: Arc<Mutex<Option<Box<dyn AnySnapshot>>>> =
Arc::new(Mutex::new(None));
let revoked_arc: Arc<
std::sync::RwLock<HashSet<ed25519_dalek::VerifyingKey>>,
> = Arc::new(std::sync::RwLock::new(HashSet::new()));
let dummy_secret = iroh_secret(&SigningKey::generate(&mut OsRng));
let dummy_id: EndpointId = dummy_secret.public().into();
let sender = NetSender {
cmd_tx,
snapshot: snapshot_arc.clone(),
revoked: revoked_arc.clone(),
team_root: team_root.verifying_key(),
id: dummy_id,
};
let snap = snapshot_with_blobs(&[rev_blob, rev_sig_blob]);
let snap: Box<dyn AnySnapshot> = snap;
assert!(revoked_arc.read().unwrap().is_empty());
sender.update_snapshot(BoxedSnap(snap));
let revoked_after = revoked_arc.read().unwrap();
assert!(
revoked_after.contains(&target.verifying_key()),
"target pubkey appears in revoked set after update_snapshot",
);
assert_eq!(
revoked_after.len(),
1,
"exactly one new revocation, not duplicates",
);
}
#[test]
fn update_snapshot_ignores_bystander_signed_revocations() {
use std::sync::mpsc as std_mpsc;
use triblespace_core::repo::capability::build_revocation;
let team_root = SigningKey::generate(&mut OsRng);
let bystander = SigningKey::generate(&mut OsRng);
let target = SigningKey::generate(&mut OsRng);
let (rev_blob, rev_sig_blob) =
build_revocation(&bystander, target.verifying_key());
let (cmd_tx, _cmd_rx) = std_mpsc::channel::<NetCommand>();
let snapshot_arc: Arc<Mutex<Option<Box<dyn AnySnapshot>>>> =
Arc::new(Mutex::new(None));
let revoked_arc: Arc<
std::sync::RwLock<HashSet<ed25519_dalek::VerifyingKey>>,
> = Arc::new(std::sync::RwLock::new(HashSet::new()));
let dummy_secret = iroh_secret(&SigningKey::generate(&mut OsRng));
let dummy_id: EndpointId = dummy_secret.public().into();
let sender = NetSender {
cmd_tx,
snapshot: snapshot_arc,
revoked: revoked_arc.clone(),
team_root: team_root.verifying_key(),
id: dummy_id,
};
let snap = snapshot_with_blobs(&[rev_blob, rev_sig_blob]);
sender.update_snapshot(BoxedSnap(snap));
assert!(
revoked_arc.read().unwrap().is_empty(),
"bystander-signed revocation must not propagate into the \
relay's revoked set",
);
}
struct BoxedSnap(Box<dyn AnySnapshot>);
impl AnySnapshot for BoxedSnap {
fn get_blob(&self, hash: &RawHash) -> Option<Vec<u8>> {
self.0.get_blob(hash)
}
fn has_blob(&self, hash: &RawHash) -> bool {
self.0.has_blob(hash)
}
fn list_branches(&self) -> &[(RawBranchId, RawHash)] {
self.0.list_branches()
}
fn all_simple_archive_blobs(
&self,
) -> Vec<triblespace_core::blob::Blob<
triblespace_core::blob::encodings::simplearchive::SimpleArchive,
>> {
self.0.all_simple_archive_blobs()
}
}
#[test]
fn snapshot_lookup_rejects_chain_signed_by_a_foreign_root() {
let real_team_root = SigningKey::generate(&mut OsRng);
let fake_team_root = SigningKey::generate(&mut OsRng);
let founder = SigningKey::generate(&mut OsRng);
let (scope_root, scope_facts) = empty_scope();
let (cap_blob, sig_blob) = build_capability(
&fake_team_root,
founder.verifying_key(),
None,
scope_root,
scope_facts,
now_plus_24h(),
)
.expect("cap builds");
let sig_handle: Inline<Handle<SimpleArchive>> =
(&sig_blob).get_handle();
let snap_box = snapshot_with_blobs(&[cap_blob, sig_blob]);
let snap_arc: Arc<Mutex<Option<Box<dyn AnySnapshot>>>> =
Arc::new(Mutex::new(Some(snap_box)));
let revoked = HashSet::new();
let result = verify_chain(
real_team_root.verifying_key(),
sig_handle,
founder.verifying_key(),
&revoked,
fetch_via_snapshot(&snap_arc),
);
assert!(
result.is_err(),
"chain signed by a foreign root must fail verification; got {:?}",
result,
);
}
async fn build_endpoint_on_test_network(
secret: iroh_base::SecretKey,
transport: std::sync::Arc<
iroh::test_utils::test_transport::TestTransport,
>,
) -> iroh::Endpoint {
use iroh::endpoint::presets;
iroh::Endpoint::builder(presets::N0)
.secret_key(secret)
.relay_mode(iroh::RelayMode::Disabled)
.ca_roots_config(iroh::tls::CaRootsConfig::insecure_skip_verify())
.add_custom_transport(transport)
.clear_ip_transports()
.bind()
.await
.expect("bind endpoint on TestNetwork")
}
async fn dial_against_auth_server(
team_root: ed25519_dalek::VerifyingKey,
cap_blob: Blob<SimpleArchive>,
sig_blob: Blob<SimpleArchive>,
client_signing: &SigningKey,
) -> (
iroh::protocol::Router,
iroh::Endpoint,
iroh::endpoint::Connection,
) {
use iroh::test_utils::test_transport::{TestNetwork, to_custom_addr};
let network = TestNetwork::new();
let server_secret = iroh_secret(&SigningKey::generate(&mut OsRng));
let client_secret = iroh_secret(client_signing);
let server_id = server_secret.public();
let client_id = client_secret.public();
let server_transport = network
.create_transport(server_id)
.expect("create server transport");
let client_transport = network
.create_transport(client_id)
.expect("create client transport");
let server_ep =
build_endpoint_on_test_network(server_secret, server_transport).await;
let client_ep =
build_endpoint_on_test_network(client_secret, client_transport).await;
let snap = snapshot_with_blobs(&[cap_blob, sig_blob]);
let snap_arc: Arc<Mutex<Option<Box<dyn AnySnapshot>>>> =
Arc::new(Mutex::new(Some(snap)));
let revoked: Arc<
std::sync::RwLock<HashSet<ed25519_dalek::VerifyingKey>>,
> = Arc::new(std::sync::RwLock::new(HashSet::new()));
let handler = SnapshotHandler {
snapshot: snap_arc,
team_root,
revoked,
};
let router = iroh::protocol::Router::builder(server_ep)
.accept(PILE_SYNC_ALPN, handler)
.spawn();
let server_addr = iroh_base::EndpointAddr::from_parts(
server_id,
std::iter::once(iroh_base::TransportAddr::Custom(
to_custom_addr(server_id),
)),
);
let conn = client_ep
.connect(server_addr, PILE_SYNC_ALPN)
.await
.expect("client connect");
(router, client_ep, conn)
}
#[tokio::test]
async fn e2e_smoke_echo_over_test_network() {
use iroh::test_utils::test_transport::TestNetwork;
use iroh::protocol::{ProtocolHandler, Router, AcceptError};
use iroh::endpoint::Connection;
const ECHO_ALPN: &[u8] = b"smoke/echo/1";
#[derive(Debug, Clone)]
struct Echo;
impl ProtocolHandler for Echo {
async fn accept(&self, conn: Connection) -> Result<(), AcceptError> {
let (mut send, mut recv) = conn.accept_bi().await?;
tokio::io::copy(&mut recv, &mut send).await?;
send.finish()?;
conn.closed().await;
Ok(())
}
}
let network = TestNetwork::new();
let s_server = iroh_secret(&SigningKey::generate(&mut OsRng));
let s_client = iroh_secret(&SigningKey::generate(&mut OsRng));
let server_id = s_server.public();
let client_id = s_client.public();
let t_server = network.create_transport(server_id).unwrap();
let t_client = network.create_transport(client_id).unwrap();
let ep_server = build_endpoint_on_test_network(s_server, t_server).await;
let ep_client = build_endpoint_on_test_network(s_client, t_client).await;
let router = Router::builder(ep_server).accept(ECHO_ALPN, Echo).spawn();
use iroh::test_utils::test_transport::to_custom_addr;
let server_addr = iroh_base::EndpointAddr::from_parts(
server_id,
std::iter::once(iroh_base::TransportAddr::Custom(
to_custom_addr(server_id),
)),
);
let conn = ep_client.connect(server_addr, ECHO_ALPN).await
.expect("client connect");
let (mut send, mut recv) = conn.open_bi().await.unwrap();
send.write_all(b"hello").await.unwrap();
send.finish().unwrap();
let response = recv.read_to_end(100).await.unwrap();
assert_eq!(response, b"hello");
let _ = router.shutdown().await;
}
#[tokio::test]
async fn e2e_auth_handshake_accepts_valid_cap() {
let team_root = SigningKey::generate(&mut OsRng);
let founder = SigningKey::generate(&mut OsRng);
let (scope_root, scope_facts) = empty_scope();
let (cap_blob, sig_blob) = build_capability(
&team_root,
founder.verifying_key(),
None,
scope_root,
scope_facts,
now_plus_24h(),
)
.expect("cap builds");
let sig_handle: Inline<Handle<SimpleArchive>> =
(&sig_blob).get_handle();
let (router, _client_ep, conn) = dial_against_auth_server(
team_root.verifying_key(),
cap_blob,
sig_blob,
&founder,
)
.await;
crate::protocol::op_auth(&conn, &sig_handle.raw)
.await
.expect("server accepts cap chained from configured team root");
let _ = router.shutdown().await;
}
#[tokio::test]
async fn e2e_auth_handshake_rejects_zero_cap() {
let team_root = SigningKey::generate(&mut OsRng);
let founder = SigningKey::generate(&mut OsRng);
let (scope_root, scope_facts) = empty_scope();
let (cap_blob, sig_blob) = build_capability(
&team_root,
founder.verifying_key(),
None,
scope_root,
scope_facts,
now_plus_24h(),
)
.expect("cap builds");
let (router, _client_ep, conn) = dial_against_auth_server(
team_root.verifying_key(),
cap_blob,
sig_blob,
&founder,
)
.await;
let zero_handle = [0u8; 32];
let result = crate::protocol::op_auth(&conn, &zero_handle).await;
let err = result.expect_err("zero handle must be rejected");
let msg = format!("{err}");
assert!(
msg.contains("rejected capability"),
"expected explicit rejection over the wire, got: {msg}",
);
let _ = router.shutdown().await;
}
#[tokio::test]
async fn e2e_auth_handshake_rejects_chain_signed_by_foreign_root() {
let real_team_root = SigningKey::generate(&mut OsRng);
let fake_team_root = SigningKey::generate(&mut OsRng);
let founder = SigningKey::generate(&mut OsRng);
let (scope_root, scope_facts) = empty_scope();
let (cap_blob, sig_blob) = build_capability(
&fake_team_root,
founder.verifying_key(),
None,
scope_root,
scope_facts,
now_plus_24h(),
)
.expect("cap builds");
let sig_handle: Inline<Handle<SimpleArchive>> =
(&sig_blob).get_handle();
let (router, _client_ep, conn) = dial_against_auth_server(
real_team_root.verifying_key(),
cap_blob,
sig_blob,
&founder,
)
.await;
let result = crate::protocol::op_auth(&conn, &sig_handle.raw).await;
let err = result.expect_err("foreign-root cap must be rejected");
let msg = format!("{err}");
assert!(
msg.contains("rejected capability"),
"expected explicit rejection over the wire, got: {msg}",
);
let _ = router.shutdown().await;
}
}