#![allow(clippy::unwrap_used)]
#![allow(clippy::expect_used)]
#![allow(missing_docs)]
pub mod error;
pub mod identity;
pub mod storage;
pub mod bootstrap;
pub mod network;
pub mod contacts;
pub mod trust;
pub mod connectivity;
pub mod gossip;
pub mod crdt;
pub mod kv;
pub mod groups;
pub mod mls;
pub mod direct;
pub mod presence;
pub mod upgrade;
pub mod files;
pub mod constitution;
pub mod api;
pub mod cli;
pub use gossip::{
GossipConfig, GossipRuntime, PubSubManager, PubSubMessage, SigningContext, Subscription,
};
pub use direct::{DirectMessage, DirectMessageReceiver, DirectMessaging};
use saorsa_gossip_membership::Membership as _;
pub struct Agent {
identity: std::sync::Arc<identity::Identity>,
#[allow(dead_code)]
network: Option<std::sync::Arc<network::NetworkNode>>,
gossip_runtime: Option<std::sync::Arc<gossip::GossipRuntime>>,
bootstrap_cache: Option<std::sync::Arc<ant_quic::BootstrapCache>>,
gossip_cache_adapter: Option<saorsa_gossip_coordinator::GossipCacheAdapter>,
identity_discovery_cache: std::sync::Arc<
tokio::sync::RwLock<std::collections::HashMap<identity::AgentId, DiscoveredAgent>>,
>,
identity_listener_started: std::sync::atomic::AtomicBool,
heartbeat_interval_secs: u64,
identity_ttl_secs: u64,
heartbeat_handle: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
rendezvous_advertised: std::sync::atomic::AtomicBool,
contact_store: std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>>,
direct_messaging: std::sync::Arc<direct::DirectMessaging>,
direct_listener_started: std::sync::atomic::AtomicBool,
presence: Option<std::sync::Arc<presence::PresenceWrapper>>,
user_identity_consented: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl std::fmt::Debug for Agent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Agent")
.field("identity", &self.identity)
.field("network", &self.network.is_some())
.field("gossip_runtime", &self.gossip_runtime.is_some())
.field("bootstrap_cache", &self.bootstrap_cache.is_some())
.field("gossip_cache_adapter", &self.gossip_cache_adapter.is_some())
.finish()
}
}
#[derive(Debug, Clone)]
pub struct Message {
pub origin: String,
pub payload: Vec<u8>,
pub topic: String,
}
pub const IDENTITY_ANNOUNCE_TOPIC: &str = "x0x.identity.announce.v1";
#[must_use]
pub fn shard_topic_for_agent(agent_id: &identity::AgentId) -> String {
let shard = saorsa_gossip_rendezvous::calculate_shard(&agent_id.0);
format!("x0x.identity.shard.{shard}")
}
pub const RENDEZVOUS_SHARD_TOPIC_PREFIX: &str = "x0x.rendezvous.shard";
#[must_use]
pub fn rendezvous_shard_topic_for_agent(agent_id: &identity::AgentId) -> String {
let shard = saorsa_gossip_rendezvous::calculate_shard(&agent_id.0);
format!("{RENDEZVOUS_SHARD_TOPIC_PREFIX}.{shard}")
}
fn is_globally_routable(ip: std::net::IpAddr) -> bool {
match ip {
std::net::IpAddr::V4(v4) => {
!v4.is_private() && !v4.is_loopback() && !v4.is_link_local() && !v4.is_unspecified() && !v4.is_broadcast() && !v4.is_documentation() && !(v4.octets()[0] == 100 && (v4.octets()[1] & 0xC0) == 64)
}
std::net::IpAddr::V6(v6) => {
let segs = v6.segments();
!v6.is_loopback() && !v6.is_unspecified() && (segs[0] & 0xffc0) != 0xfe80 && (segs[0] & 0xfe00) != 0xfc00 && (segs[0] & 0xfff0) != 0xfec0 }
}
}
pub const IDENTITY_HEARTBEAT_INTERVAL_SECS: u64 = 300;
pub const IDENTITY_TTL_SECS: u64 = 900;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct IdentityAnnouncementUnsigned {
agent_id: identity::AgentId,
machine_id: identity::MachineId,
user_id: Option<identity::UserId>,
agent_certificate: Option<identity::AgentCertificate>,
machine_public_key: Vec<u8>,
addresses: Vec<std::net::SocketAddr>,
announced_at: u64,
nat_type: Option<String>,
can_receive_direct: Option<bool>,
is_relay: Option<bool>,
is_coordinator: Option<bool>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct IdentityAnnouncement {
pub agent_id: identity::AgentId,
pub machine_id: identity::MachineId,
pub user_id: Option<identity::UserId>,
pub agent_certificate: Option<identity::AgentCertificate>,
pub machine_public_key: Vec<u8>,
pub machine_signature: Vec<u8>,
pub addresses: Vec<std::net::SocketAddr>,
pub announced_at: u64,
pub nat_type: Option<String>,
pub can_receive_direct: Option<bool>,
pub is_relay: Option<bool>,
pub is_coordinator: Option<bool>,
}
impl IdentityAnnouncement {
fn to_unsigned(&self) -> IdentityAnnouncementUnsigned {
IdentityAnnouncementUnsigned {
agent_id: self.agent_id,
machine_id: self.machine_id,
user_id: self.user_id,
agent_certificate: self.agent_certificate.clone(),
machine_public_key: self.machine_public_key.clone(),
addresses: self.addresses.clone(),
announced_at: self.announced_at,
nat_type: self.nat_type.clone(),
can_receive_direct: self.can_receive_direct,
is_relay: self.is_relay,
is_coordinator: self.is_coordinator,
}
}
pub fn verify(&self) -> error::Result<()> {
let machine_pub =
ant_quic::MlDsaPublicKey::from_bytes(&self.machine_public_key).map_err(|_| {
error::IdentityError::CertificateVerification(
"invalid machine public key in announcement".to_string(),
)
})?;
let derived_machine_id = identity::MachineId::from_public_key(&machine_pub);
if derived_machine_id != self.machine_id {
return Err(error::IdentityError::CertificateVerification(
"machine_id does not match machine public key".to_string(),
));
}
let unsigned_bytes = bincode::serialize(&self.to_unsigned()).map_err(|e| {
error::IdentityError::Serialization(format!(
"failed to serialize announcement for verification: {e}"
))
})?;
let signature = ant_quic::crypto::raw_public_keys::pqc::MlDsaSignature::from_bytes(
&self.machine_signature,
)
.map_err(|e| {
error::IdentityError::CertificateVerification(format!(
"invalid machine signature in announcement: {:?}",
e
))
})?;
ant_quic::crypto::raw_public_keys::pqc::verify_with_ml_dsa(
&machine_pub,
&unsigned_bytes,
&signature,
)
.map_err(|e| {
error::IdentityError::CertificateVerification(format!(
"machine signature verification failed: {:?}",
e
))
})?;
match (self.user_id, self.agent_certificate.as_ref()) {
(Some(user_id), Some(cert)) => {
cert.verify()?;
let cert_agent_id = cert.agent_id()?;
if cert_agent_id != self.agent_id {
return Err(error::IdentityError::CertificateVerification(
"agent certificate agent_id mismatch".to_string(),
));
}
let cert_user_id = cert.user_id()?;
if cert_user_id != user_id {
return Err(error::IdentityError::CertificateVerification(
"agent certificate user_id mismatch".to_string(),
));
}
Ok(())
}
(None, None) => Ok(()),
_ => Err(error::IdentityError::CertificateVerification(
"user identity disclosure requires matching certificate".to_string(),
)),
}
}
}
#[derive(Debug, Clone)]
pub struct DiscoveredAgent {
pub agent_id: identity::AgentId,
pub machine_id: identity::MachineId,
pub user_id: Option<identity::UserId>,
pub addresses: Vec<std::net::SocketAddr>,
pub announced_at: u64,
pub last_seen: u64,
#[doc(hidden)]
pub machine_public_key: Vec<u8>,
pub nat_type: Option<String>,
pub can_receive_direct: Option<bool>,
pub is_relay: Option<bool>,
pub is_coordinator: Option<bool>,
}
#[derive(Debug)]
pub struct AgentBuilder {
machine_key_path: Option<std::path::PathBuf>,
agent_keypair: Option<identity::AgentKeypair>,
agent_key_path: Option<std::path::PathBuf>,
user_keypair: Option<identity::UserKeypair>,
user_key_path: Option<std::path::PathBuf>,
#[allow(dead_code)]
network_config: Option<network::NetworkConfig>,
peer_cache_dir: Option<std::path::PathBuf>,
disable_peer_cache: bool,
heartbeat_interval_secs: Option<u64>,
identity_ttl_secs: Option<u64>,
contact_store_path: Option<std::path::PathBuf>,
}
struct HeartbeatContext {
identity: std::sync::Arc<identity::Identity>,
runtime: std::sync::Arc<gossip::GossipRuntime>,
network: std::sync::Arc<network::NetworkNode>,
interval_secs: u64,
cache: std::sync::Arc<
tokio::sync::RwLock<std::collections::HashMap<identity::AgentId, DiscoveredAgent>>,
>,
user_identity_consented: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl HeartbeatContext {
async fn announce(&self) -> error::Result<()> {
let machine_public_key = self
.identity
.machine_keypair()
.public_key()
.as_bytes()
.to_vec();
let announced_at = Agent::unix_timestamp_secs();
let mut addresses = match self.network.node_status().await {
Some(status) if !status.external_addrs.is_empty() => status.external_addrs,
_ => match self.network.routable_addr().await {
Some(addr) => vec![addr],
None => Vec::new(),
},
};
let bind_port = self
.network
.bound_addr()
.await
.map(|a| a.port())
.unwrap_or(5483);
if let Ok(sock) = std::net::UdpSocket::bind("[::]:0") {
if sock.connect("[2001:4860:4860::8888]:80").is_ok() {
if let Ok(local) = sock.local_addr() {
if let std::net::IpAddr::V6(v6) = local.ip() {
let segs = v6.segments();
let is_global = (segs[0] & 0xffc0) != 0xfe80
&& (segs[0] & 0xff00) != 0xfd00
&& !v6.is_loopback();
if is_global {
let v6_addr =
std::net::SocketAddr::new(std::net::IpAddr::V6(v6), bind_port);
if !addresses.contains(&v6_addr) {
addresses.push(v6_addr);
}
}
}
}
}
}
if let Ok(sock) = std::net::UdpSocket::bind("0.0.0.0:0") {
if sock.connect("8.8.8.8:80").is_ok() {
if let Ok(local) = sock.local_addr() {
if let std::net::IpAddr::V4(v4) = local.ip() {
if v4.is_private() {
let lan_addr =
std::net::SocketAddr::new(std::net::IpAddr::V4(v4), bind_port);
if !addresses.contains(&lan_addr) {
addresses.push(lan_addr);
}
}
}
}
}
}
addresses.retain(|a| a.port() > 0 && !a.ip().is_unspecified() && !a.ip().is_loopback());
let (nat_type, can_receive_direct, is_relay, is_coordinator) =
match self.network.node_status().await {
Some(status) => (
Some(status.nat_type.to_string()),
Some(status.can_receive_direct),
Some(status.is_relaying),
Some(status.is_coordinating),
),
None => (None, None, None, None),
};
let include_user = self
.user_identity_consented
.load(std::sync::atomic::Ordering::Acquire);
let (user_id, agent_certificate) = if include_user {
(
self.identity
.user_keypair()
.map(identity::UserKeypair::user_id),
self.identity.agent_certificate().cloned(),
)
} else {
(None, None)
};
let unsigned = IdentityAnnouncementUnsigned {
agent_id: self.identity.agent_id(),
machine_id: self.identity.machine_id(),
user_id,
agent_certificate,
machine_public_key: machine_public_key.clone(),
addresses,
announced_at,
nat_type: nat_type.clone(),
can_receive_direct,
is_relay,
is_coordinator,
};
let unsigned_bytes = bincode::serialize(&unsigned).map_err(|e| {
error::IdentityError::Serialization(format!(
"heartbeat: failed to serialize announcement: {e}"
))
})?;
let machine_signature = ant_quic::crypto::raw_public_keys::pqc::sign_with_ml_dsa(
self.identity.machine_keypair().secret_key(),
&unsigned_bytes,
)
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"heartbeat: failed to sign announcement: {:?}",
e
)))
})?
.as_bytes()
.to_vec();
let announcement = IdentityAnnouncement {
agent_id: unsigned.agent_id,
machine_id: unsigned.machine_id,
user_id: unsigned.user_id,
agent_certificate: unsigned.agent_certificate,
machine_public_key: machine_public_key.clone(),
machine_signature,
addresses: unsigned.addresses,
announced_at,
nat_type,
can_receive_direct,
is_relay,
is_coordinator,
};
let encoded = bincode::serialize(&announcement).map_err(|e| {
error::IdentityError::Serialization(format!(
"heartbeat: failed to serialize announcement: {e}"
))
})?;
self.runtime
.pubsub()
.publish(
IDENTITY_ANNOUNCE_TOPIC.to_string(),
bytes::Bytes::from(encoded),
)
.await
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"heartbeat: publish failed: {e}"
)))
})?;
let now = Agent::unix_timestamp_secs();
self.cache.write().await.insert(
announcement.agent_id,
DiscoveredAgent {
agent_id: announcement.agent_id,
machine_id: announcement.machine_id,
user_id: announcement.user_id,
addresses: announcement.addresses,
announced_at: announcement.announced_at,
last_seen: now,
machine_public_key: machine_public_key.clone(),
nat_type: announcement.nat_type.clone(),
can_receive_direct: announcement.can_receive_direct,
is_relay: announcement.is_relay,
is_coordinator: announcement.is_coordinator,
},
);
Ok(())
}
}
impl Agent {
pub async fn new() -> error::Result<Self> {
Agent::builder().build().await
}
pub fn builder() -> AgentBuilder {
AgentBuilder {
machine_key_path: None,
agent_keypair: None,
agent_key_path: None,
user_keypair: None,
user_key_path: None,
network_config: None,
peer_cache_dir: None,
disable_peer_cache: false,
heartbeat_interval_secs: None,
identity_ttl_secs: None,
contact_store_path: None,
}
}
#[inline]
#[must_use]
pub fn identity(&self) -> &identity::Identity {
&self.identity
}
#[inline]
#[must_use]
pub fn machine_id(&self) -> identity::MachineId {
self.identity.machine_id()
}
#[inline]
#[must_use]
pub fn agent_id(&self) -> identity::AgentId {
self.identity.agent_id()
}
#[inline]
#[must_use]
pub fn user_id(&self) -> Option<identity::UserId> {
self.identity.user_id()
}
#[inline]
#[must_use]
pub fn agent_certificate(&self) -> Option<&identity::AgentCertificate> {
self.identity.agent_certificate()
}
#[must_use]
pub fn network(&self) -> Option<&std::sync::Arc<network::NetworkNode>> {
self.network.as_ref()
}
pub fn gossip_cache_adapter(&self) -> Option<&saorsa_gossip_coordinator::GossipCacheAdapter> {
self.gossip_cache_adapter.as_ref()
}
#[must_use]
pub fn presence_system(&self) -> Option<&std::sync::Arc<presence::PresenceWrapper>> {
self.presence.as_ref()
}
#[must_use]
pub fn contacts(&self) -> &std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>> {
&self.contact_store
}
pub async fn reachability(
&self,
agent_id: &identity::AgentId,
) -> Option<connectivity::ReachabilityInfo> {
let cache = self.identity_discovery_cache.read().await;
cache
.get(agent_id)
.map(connectivity::ReachabilityInfo::from_discovered)
}
pub async fn connect_to_agent(
&self,
agent_id: &identity::AgentId,
) -> error::Result<connectivity::ConnectOutcome> {
let discovered = {
let cache = self.identity_discovery_cache.read().await;
cache.get(agent_id).cloned()
};
let agent = match discovered {
Some(a) => a,
None => return Ok(connectivity::ConnectOutcome::NotFound),
};
let info = connectivity::ReachabilityInfo::from_discovered(&agent);
let Some(ref network) = self.network else {
return Ok(connectivity::ConnectOutcome::Unreachable);
};
let machine_peer_id = ant_quic::PeerId(agent.machine_id.0);
if network.is_connected(&machine_peer_id).await {
self.direct_messaging
.mark_connected(agent.agent_id, agent.machine_id)
.await;
return if let Some(addr) = info.addresses.first() {
Ok(connectivity::ConnectOutcome::Direct(*addr))
} else {
Ok(connectivity::ConnectOutcome::AlreadyConnected)
};
}
if info.addresses.is_empty() {
return Ok(connectivity::ConnectOutcome::Unreachable);
}
if info.should_attempt_direct() {
for addr in &info.addresses {
match network.connect_addr(*addr).await {
Ok(connected_peer_id) => {
let real_machine_id = identity::MachineId(connected_peer_id.0);
if let Some(ref bc) = self.bootstrap_cache {
bc.add_from_connection(connected_peer_id, vec![*addr], None)
.await;
}
{
let mut cache = self.identity_discovery_cache.write().await;
if let Some(entry) = cache.get_mut(agent_id) {
entry.machine_id = real_machine_id;
}
}
self.direct_messaging
.mark_connected(agent.agent_id, real_machine_id)
.await;
return Ok(connectivity::ConnectOutcome::Direct(*addr));
}
Err(e) => {
tracing::debug!("Direct connect to {} failed: {}", addr, e);
}
}
}
}
if info.needs_coordination() || !info.should_attempt_direct() {
let peer_id_hint = ant_quic::PeerId(agent.machine_id.0);
let hint_was_zeroed = agent.machine_id.0 == [0u8; 32];
match network.connect_peer(peer_id_hint).await {
Ok((addr, verified_peer_id)) => {
let verified_machine_id = identity::MachineId(verified_peer_id.0);
if !hint_was_zeroed {
if let Some(ref bc) = self.bootstrap_cache {
bc.add_from_connection(verified_peer_id, vec![addr], None)
.await;
bc.record_success(&verified_peer_id, 0).await;
}
{
let mut cache = self.identity_discovery_cache.write().await;
if let Some(entry) = cache.get_mut(agent_id) {
entry.machine_id = verified_machine_id;
}
}
}
if !hint_was_zeroed {
self.direct_messaging
.mark_connected(agent.agent_id, verified_machine_id)
.await;
}
return Ok(connectivity::ConnectOutcome::Coordinated(addr));
}
Err(e) => {
tracing::debug!("Hole-punch to {:?} failed: {}", agent.machine_id, e);
}
}
}
Ok(connectivity::ConnectOutcome::Unreachable)
}
pub async fn shutdown(&self) {
if let Some(ref pw) = self.presence {
pw.shutdown().await;
tracing::info!("Presence system shut down");
}
if let Some(ref cache) = self.bootstrap_cache {
if let Err(e) = cache.save().await {
tracing::warn!("Failed to save bootstrap cache on shutdown: {e}");
} else {
tracing::info!("Bootstrap cache saved on shutdown");
}
}
}
pub async fn send_direct(
&self,
agent_id: &identity::AgentId,
payload: Vec<u8>,
) -> error::NetworkResult<()> {
let network = self.network.as_ref().ok_or_else(|| {
error::NetworkError::NodeCreation("network not initialized".to_string())
})?;
let cached_machine_id = {
let cache = self.identity_discovery_cache.read().await;
cache
.get(agent_id)
.map(|d| d.machine_id)
.filter(|m| m.0 != [0u8; 32]) };
let machine_id = match cached_machine_id {
Some(id) => id,
None => {
match self.direct_messaging.get_machine_id(agent_id).await {
Some(id) => id,
None => {
let _ = self.connect_to_agent(agent_id).await;
self.direct_messaging
.get_machine_id(agent_id)
.await
.ok_or(error::NetworkError::AgentNotFound(agent_id.0))?
}
}
}
};
let ant_peer_id = ant_quic::PeerId(machine_id.0);
if !network.is_connected(&ant_peer_id).await {
return Err(error::NetworkError::AgentNotConnected(agent_id.0));
}
network
.send_direct(&ant_peer_id, &self.identity.agent_id().0, &payload)
.await?;
tracing::info!(
"Sent {} bytes directly to agent {:?}",
payload.len(),
agent_id
);
Ok(())
}
pub async fn recv_direct(&self) -> Option<direct::DirectMessage> {
self.recv_direct_inner().await
}
pub async fn recv_direct_filtered(&self) -> Option<direct::DirectMessage> {
loop {
let msg = self.recv_direct_inner().await?;
let contacts = self.contact_store.read().await;
if let Some(contact) = contacts.get(&msg.sender) {
if contact.trust_level == contacts::TrustLevel::Blocked {
tracing::debug!(
"Dropping direct message from blocked agent {:?}",
msg.sender
);
continue;
}
}
return Some(msg);
}
}
async fn recv_direct_inner(&self) -> Option<direct::DirectMessage> {
self.direct_messaging.recv().await
}
pub fn subscribe_direct(&self) -> direct::DirectMessageReceiver {
self.direct_messaging.subscribe()
}
pub fn direct_messaging(&self) -> &std::sync::Arc<direct::DirectMessaging> {
&self.direct_messaging
}
pub async fn is_agent_connected(&self, agent_id: &identity::AgentId) -> bool {
let Some(network) = &self.network else {
return false;
};
let machine_id = {
let cache = self.identity_discovery_cache.read().await;
cache.get(agent_id).map(|d| d.machine_id)
};
match machine_id {
Some(mid) => {
let ant_peer_id = ant_quic::PeerId(mid.0);
network.is_connected(&ant_peer_id).await
}
None => false,
}
}
pub async fn connected_agents(&self) -> Vec<identity::AgentId> {
let Some(network) = &self.network else {
return Vec::new();
};
let connected_peers = network.connected_peers().await;
let cache = self.identity_discovery_cache.read().await;
cache
.values()
.filter(|agent| {
let ant_peer_id = ant_quic::PeerId(agent.machine_id.0);
connected_peers.contains(&ant_peer_id)
})
.map(|agent| agent.agent_id)
.collect()
}
pub fn set_contacts(&self, store: std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>>) {
if let Some(runtime) = &self.gossip_runtime {
runtime.pubsub().set_contacts(store);
}
}
pub async fn announce_identity(
&self,
include_user_identity: bool,
human_consent: bool,
) -> error::Result<()> {
let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
error::IdentityError::Storage(std::io::Error::other(
"gossip runtime not initialized - configure agent with network first",
))
})?;
self.start_identity_listener().await?;
let mut addresses = if let Some(network) = self.network.as_ref() {
match network.node_status().await {
Some(status) if !status.external_addrs.is_empty() => status.external_addrs,
_ => match network.routable_addr().await {
Some(addr) => vec![addr],
None => self.announcement_addresses(),
},
}
} else {
self.announcement_addresses()
};
let bind_port = if let Some(network) = self.network.as_ref() {
network.bound_addr().await.map(|a| a.port()).unwrap_or(5483)
} else {
5483
};
if let Ok(sock) = std::net::UdpSocket::bind("[::]:0") {
if sock.connect("[2001:4860:4860::8888]:80").is_ok() {
if let Ok(local) = sock.local_addr() {
if let std::net::IpAddr::V6(v6) = local.ip() {
let segs = v6.segments();
let is_global = (segs[0] & 0xffc0) != 0xfe80
&& (segs[0] & 0xff00) != 0xfd00
&& !v6.is_loopback();
if is_global {
let v6_addr =
std::net::SocketAddr::new(std::net::IpAddr::V6(v6), bind_port);
if !addresses.contains(&v6_addr) {
addresses.push(v6_addr);
}
}
}
}
}
}
if let Ok(sock) = std::net::UdpSocket::bind("0.0.0.0:0") {
if sock.connect("8.8.8.8:80").is_ok() {
if let Ok(local) = sock.local_addr() {
if let std::net::IpAddr::V4(v4) = local.ip() {
if v4.is_private() {
let lan_addr =
std::net::SocketAddr::new(std::net::IpAddr::V4(v4), bind_port);
if !addresses.contains(&lan_addr) {
addresses.push(lan_addr);
}
}
}
}
}
}
addresses.retain(|a| a.port() > 0 && !a.ip().is_unspecified() && !a.ip().is_loopback());
let announcement = self.build_identity_announcement_with_addrs(
include_user_identity,
human_consent,
addresses,
)?;
let encoded = bincode::serialize(&announcement).map_err(|e| {
error::IdentityError::Serialization(format!(
"failed to serialize identity announcement: {e}"
))
})?;
let payload = bytes::Bytes::from(encoded);
let shard_topic = shard_topic_for_agent(&announcement.agent_id);
runtime
.pubsub()
.publish(shard_topic, payload.clone())
.await
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"failed to publish identity announcement to shard topic: {e}"
)))
})?;
runtime
.pubsub()
.publish(IDENTITY_ANNOUNCE_TOPIC.to_string(), payload)
.await
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"failed to publish identity announcement: {e}"
)))
})?;
let now = Self::unix_timestamp_secs();
self.identity_discovery_cache.write().await.insert(
announcement.agent_id,
DiscoveredAgent {
agent_id: announcement.agent_id,
machine_id: announcement.machine_id,
user_id: announcement.user_id,
addresses: announcement.addresses.clone(),
announced_at: announcement.announced_at,
last_seen: now,
machine_public_key: announcement.machine_public_key.clone(),
nat_type: announcement.nat_type.clone(),
can_receive_direct: announcement.can_receive_direct,
is_relay: announcement.is_relay,
is_coordinator: announcement.is_coordinator,
},
);
if include_user_identity && human_consent {
self.user_identity_consented
.store(true, std::sync::atomic::Ordering::Release);
}
Ok(())
}
pub async fn discovered_agents(&self) -> error::Result<Vec<DiscoveredAgent>> {
self.start_identity_listener().await?;
let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
let mut agents: Vec<_> = self
.identity_discovery_cache
.read()
.await
.values()
.filter(|a| a.announced_at >= cutoff)
.cloned()
.collect();
agents.sort_by(|a, b| a.agent_id.0.cmp(&b.agent_id.0));
Ok(agents)
}
pub async fn discovered_agents_unfiltered(&self) -> error::Result<Vec<DiscoveredAgent>> {
self.start_identity_listener().await?;
let mut agents: Vec<_> = self
.identity_discovery_cache
.read()
.await
.values()
.cloned()
.collect();
agents.sort_by(|a, b| a.agent_id.0.cmp(&b.agent_id.0));
Ok(agents)
}
pub async fn discovered_agent(
&self,
agent_id: identity::AgentId,
) -> error::Result<Option<DiscoveredAgent>> {
self.start_identity_listener().await?;
Ok(self
.identity_discovery_cache
.read()
.await
.get(&agent_id)
.cloned())
}
async fn start_identity_listener(&self) -> error::Result<()> {
let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
error::IdentityError::Storage(std::io::Error::other(
"gossip runtime not initialized - configure agent with network first",
))
})?;
if self
.identity_listener_started
.swap(true, std::sync::atomic::Ordering::AcqRel)
{
return Ok(());
}
let mut sub_legacy = runtime
.pubsub()
.subscribe(IDENTITY_ANNOUNCE_TOPIC.to_string())
.await;
let own_shard_topic = shard_topic_for_agent(&self.agent_id());
let mut sub_shard = runtime.pubsub().subscribe(own_shard_topic).await;
let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
let bootstrap_cache = self.bootstrap_cache.clone();
let contact_store = std::sync::Arc::clone(&self.contact_store);
let network = self.network.as_ref().map(std::sync::Arc::clone);
let own_agent_id = self.agent_id();
tokio::spawn(async move {
let mut auto_connect_attempted = std::collections::HashSet::<identity::AgentId>::new();
loop {
let msg = tokio::select! {
Some(m) = sub_legacy.recv() => m,
Some(m) = sub_shard.recv() => m,
else => break,
};
let decoded = {
use bincode::Options;
bincode::options()
.with_fixint_encoding()
.with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
.allow_trailing_bytes()
.deserialize::<IdentityAnnouncement>(&msg.payload)
};
let announcement = match decoded {
Ok(a) => a,
Err(e) => {
tracing::debug!("Ignoring invalid identity announcement payload: {}", e);
continue;
}
};
if let Err(e) = announcement.verify() {
tracing::warn!("Ignoring unverifiable identity announcement: {}", e);
continue;
}
{
let store = contact_store.read().await;
let evaluator = trust::TrustEvaluator::new(&store);
let decision = evaluator.evaluate(&trust::TrustContext {
agent_id: &announcement.agent_id,
machine_id: &announcement.machine_id,
});
match decision {
trust::TrustDecision::RejectBlocked => {
tracing::debug!(
"Dropping identity announcement from blocked agent {:?}",
hex::encode(&announcement.agent_id.0[..8]),
);
continue;
}
trust::TrustDecision::RejectMachineMismatch => {
tracing::warn!(
"Dropping identity announcement from agent {:?}: machine {:?} not in pinned list",
hex::encode(&announcement.agent_id.0[..8]),
hex::encode(&announcement.machine_id.0[..8]),
);
continue;
}
_ => {}
}
}
{
let mut store = contact_store.write().await;
let record = contacts::MachineRecord::new(announcement.machine_id, None);
store.add_machine(&announcement.agent_id, record);
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
{
let public_addrs: Vec<std::net::SocketAddr> = announcement
.addresses
.iter()
.copied()
.filter(|a| is_globally_routable(a.ip()))
.collect();
if !public_addrs.is_empty() {
if let Some(ref bc) = &bootstrap_cache {
let peer_id = ant_quic::PeerId(announcement.machine_id.0);
bc.add_from_connection(peer_id, public_addrs.clone(), None)
.await;
tracing::debug!(
"Added {} public addresses to bootstrap cache for agent {:?} (machine {:?})",
public_addrs.len(),
announcement.agent_id,
hex::encode(&announcement.machine_id.0[..8]),
);
}
}
}
cache.write().await.insert(
announcement.agent_id,
DiscoveredAgent {
agent_id: announcement.agent_id,
machine_id: announcement.machine_id,
user_id: announcement.user_id,
addresses: announcement.addresses.clone(),
announced_at: announcement.announced_at,
last_seen: now,
machine_public_key: announcement.machine_public_key.clone(),
nat_type: announcement.nat_type.clone(),
can_receive_direct: announcement.can_receive_direct,
is_relay: announcement.is_relay,
is_coordinator: announcement.is_coordinator,
},
);
if announcement.agent_id != own_agent_id
&& !announcement.addresses.is_empty()
&& !auto_connect_attempted.contains(&announcement.agent_id)
{
if let Some(ref net) = &network {
let ant_peer = ant_quic::PeerId(announcement.machine_id.0);
if !net.is_connected(&ant_peer).await {
auto_connect_attempted.insert(announcement.agent_id);
let net = std::sync::Arc::clone(net);
let addresses = announcement.addresses.clone();
tokio::spawn(async move {
for addr in &addresses {
match net.connect_addr(*addr).await {
Ok(_) => {
tracing::info!(
"Auto-connected to discovered agent at {addr}",
);
return;
}
Err(e) => {
tracing::debug!("Auto-connect to {addr} failed: {e}",);
}
}
}
tracing::debug!(
"Auto-connect exhausted all {} addresses for discovered agent",
addresses.len(),
);
});
}
}
}
}
});
Ok(())
}
fn unix_timestamp_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_secs())
}
fn announcement_addresses(&self) -> Vec<std::net::SocketAddr> {
match self.network.as_ref().and_then(|n| n.local_addr()) {
Some(addr) if addr.port() > 0 && !addr.ip().is_unspecified() => vec![addr],
_ => Vec::new(),
}
}
fn build_identity_announcement(
&self,
include_user_identity: bool,
human_consent: bool,
) -> error::Result<IdentityAnnouncement> {
self.build_identity_announcement_with_addrs(
include_user_identity,
human_consent,
self.announcement_addresses(),
)
}
fn build_identity_announcement_with_addrs(
&self,
include_user_identity: bool,
human_consent: bool,
addresses: Vec<std::net::SocketAddr>,
) -> error::Result<IdentityAnnouncement> {
if include_user_identity && !human_consent {
return Err(error::IdentityError::Storage(std::io::Error::other(
"human identity disclosure requires explicit human consent — set human_consent: true in the request body",
)));
}
let (user_id, agent_certificate) = if include_user_identity {
let user_id = self.user_id().ok_or_else(|| {
error::IdentityError::Storage(std::io::Error::other(
"human identity disclosure requested but no user identity is configured — set user_key_path in your config.toml to point at your user keypair file",
))
})?;
let cert = self.agent_certificate().cloned().ok_or_else(|| {
error::IdentityError::Storage(std::io::Error::other(
"human identity disclosure requested but agent certificate is missing",
))
})?;
(Some(user_id), Some(cert))
} else {
(None, None)
};
let machine_public_key = self
.identity
.machine_keypair()
.public_key()
.as_bytes()
.to_vec();
let unsigned = IdentityAnnouncementUnsigned {
agent_id: self.agent_id(),
machine_id: self.machine_id(),
user_id,
agent_certificate: agent_certificate.clone(),
machine_public_key: machine_public_key.clone(),
addresses,
announced_at: Self::unix_timestamp_secs(),
nat_type: None,
can_receive_direct: None,
is_relay: None,
is_coordinator: None,
};
let unsigned_bytes = bincode::serialize(&unsigned).map_err(|e| {
error::IdentityError::Serialization(format!(
"failed to serialize unsigned identity announcement: {e}"
))
})?;
let machine_signature = ant_quic::crypto::raw_public_keys::pqc::sign_with_ml_dsa(
self.identity.machine_keypair().secret_key(),
&unsigned_bytes,
)
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"failed to sign identity announcement with machine key: {:?}",
e
)))
})?
.as_bytes()
.to_vec();
Ok(IdentityAnnouncement {
agent_id: unsigned.agent_id,
machine_id: unsigned.machine_id,
user_id: unsigned.user_id,
agent_certificate: unsigned.agent_certificate,
machine_public_key,
machine_signature,
addresses: unsigned.addresses,
announced_at: unsigned.announced_at,
nat_type: unsigned.nat_type,
can_receive_direct: unsigned.can_receive_direct,
is_relay: unsigned.is_relay,
is_coordinator: unsigned.is_coordinator,
})
}
pub async fn join_network(&self) -> error::Result<()> {
let Some(network) = self.network.as_ref() else {
tracing::debug!("join_network called but no network configured");
return Ok(());
};
if let Some(ref runtime) = self.gossip_runtime {
runtime.start().await.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"failed to start gossip runtime: {e}"
)))
})?;
tracing::info!("Gossip runtime started");
}
self.start_identity_listener().await?;
self.start_direct_listener();
let bootstrap_nodes = network.config().bootstrap_nodes.clone();
let min_connected = 3;
let mut all_connected: Vec<std::net::SocketAddr> = Vec::new();
if let Some(ref cache) = self.bootstrap_cache {
let coordinators = cache.select_coordinators(6).await;
let coordinator_addrs: Vec<std::net::SocketAddr> = coordinators
.iter()
.flat_map(|peer| peer.preferred_addresses())
.collect();
if !coordinator_addrs.is_empty() {
tracing::info!(
"Phase 0: Trying {} addresses from {} cached coordinators",
coordinator_addrs.len(),
coordinators.len()
);
let (succeeded, _failed) = self
.connect_peers_parallel_tracked(network, &coordinator_addrs)
.await;
all_connected.extend(&succeeded);
tracing::info!(
"Phase 0: {}/{} coordinator addresses connected",
succeeded.len(),
coordinator_addrs.len()
);
}
}
if all_connected.len() < min_connected {
if let Some(ref cache) = self.bootstrap_cache {
const PHASE1_PEER_CANDIDATES: usize = 12;
let cached_peers = cache.select_peers(PHASE1_PEER_CANDIDATES).await;
if !cached_peers.is_empty() {
tracing::info!("Phase 1: Trying {} cached peers", cached_peers.len());
let (succeeded, _failed) = self
.connect_cached_peers_parallel_tracked(network, &cached_peers)
.await;
all_connected.extend(&succeeded);
tracing::info!(
"Phase 1: {}/{} cached peers connected",
succeeded.len(),
cached_peers.len()
);
}
}
}
if all_connected.len() < min_connected && !bootstrap_nodes.is_empty() {
let remaining: Vec<std::net::SocketAddr> = bootstrap_nodes
.iter()
.filter(|addr| !all_connected.contains(addr))
.copied()
.collect();
let (succeeded, mut failed) = self
.connect_peers_parallel_tracked(network, &remaining)
.await;
all_connected.extend(&succeeded);
tracing::info!(
"Phase 2 round 1: {}/{} bootstrap peers connected",
succeeded.len(),
remaining.len()
);
for round in 2..=3 {
if failed.is_empty() {
break;
}
let delay = std::time::Duration::from_secs(if round == 2 { 10 } else { 15 });
tracing::info!(
"Retrying {} failed peers in {}s (round {})",
failed.len(),
delay.as_secs(),
round
);
tokio::time::sleep(delay).await;
let (succeeded, still_failed) =
self.connect_peers_parallel_tracked(network, &failed).await;
all_connected.extend(&succeeded);
failed = still_failed;
tracing::info!(
"Phase 2 round {}: {} total peers connected",
round,
all_connected.len()
);
}
if !failed.is_empty() {
tracing::warn!(
"Could not connect to {} bootstrap peers: {:?}",
failed.len(),
failed
);
}
}
tracing::info!(
"Network join complete. Connected to {} peers.",
all_connected.len()
);
if let Some(ref runtime) = self.gossip_runtime {
let seeds: Vec<String> = all_connected.iter().map(|addr| addr.to_string()).collect();
if !seeds.is_empty() {
if let Err(e) = runtime.membership().join(seeds).await {
tracing::warn!("HyParView membership join failed: {e}");
}
}
}
if let Some(ref pw) = self.presence {
if let Some(ref runtime) = self.gossip_runtime {
let active = runtime.membership().active_view();
for peer in active {
pw.manager().add_broadcast_peer(peer).await;
}
tracing::info!(
"Presence seeded with {} broadcast peers",
pw.manager().broadcast_peer_count().await
);
}
if let Some(ref net) = self.network {
if let Some(status) = net.node_status().await {
let mut hints: Vec<String> = status
.external_addrs
.iter()
.map(|a| a.to_string())
.collect();
hints.push(status.local_addr.to_string());
pw.manager().set_addr_hints(hints).await;
}
}
if pw.config().enable_beacons {
if let Err(e) = pw
.manager()
.start_beacons(pw.config().beacon_interval_secs)
.await
{
tracing::warn!("Failed to start presence beacons: {e}");
} else {
tracing::info!(
"Presence beacons started (interval={}s)",
pw.config().beacon_interval_secs
);
}
}
pw.start_event_loop(std::sync::Arc::clone(&self.identity_discovery_cache))
.await;
tracing::debug!("Presence event loop started");
}
if let Err(e) = self.announce_identity(false, false).await {
tracing::warn!("Initial identity announcement failed: {}", e);
}
if let Err(e) = self.start_identity_heartbeat().await {
tracing::warn!("Failed to start identity heartbeat: {e}");
}
if let (Some(ref runtime), Some(ref network)) = (&self.gossip_runtime, &self.network) {
let ctx = HeartbeatContext {
identity: std::sync::Arc::clone(&self.identity),
runtime: std::sync::Arc::clone(runtime),
network: std::sync::Arc::clone(network),
interval_secs: self.heartbeat_interval_secs,
cache: std::sync::Arc::clone(&self.identity_discovery_cache),
user_identity_consented: std::sync::Arc::clone(&self.user_identity_consented),
};
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
if let Err(e) = ctx.announce().await {
tracing::warn!("Delayed identity re-announcement failed: {e}");
} else {
tracing::info!(
"Delayed identity re-announcement sent (gossip mesh stabilized)"
);
}
});
}
Ok(())
}
async fn connect_cached_peers_parallel_tracked(
&self,
network: &std::sync::Arc<network::NetworkNode>,
peers: &[ant_quic::CachedPeer],
) -> (Vec<std::net::SocketAddr>, Vec<ant_quic::PeerId>) {
use tokio::time::{timeout, Duration};
const CONNECT_TIMEOUT: Duration = Duration::from_secs(15);
let handles: Vec<_> = peers
.iter()
.map(|peer| {
let net = network.clone();
let peer_id = peer.peer_id;
tokio::spawn(async move {
tracing::debug!("Connecting to cached peer: {:?}", peer_id);
match timeout(CONNECT_TIMEOUT, net.connect_cached_peer(peer_id)).await {
Ok(Ok(addr)) => {
tracing::info!("Connected to cached peer {:?} at {}", peer_id, addr);
Ok(addr)
}
Ok(Err(e)) => {
tracing::warn!("Failed to connect to cached peer {:?}: {}", peer_id, e);
Err(peer_id)
}
Err(_) => {
tracing::warn!(
"Connection to cached peer {:?} timed out after {}s",
peer_id,
CONNECT_TIMEOUT.as_secs()
);
Err(peer_id)
}
}
})
})
.collect();
let mut succeeded = Vec::new();
let mut failed = Vec::new();
for handle in handles {
match handle.await {
Ok(Ok(addr)) => succeeded.push(addr),
Ok(Err(peer_id)) => failed.push(peer_id),
Err(e) => tracing::error!("Connection task panicked: {}", e),
}
}
(succeeded, failed)
}
async fn connect_peers_parallel_tracked(
&self,
network: &std::sync::Arc<network::NetworkNode>,
addrs: &[std::net::SocketAddr],
) -> (Vec<std::net::SocketAddr>, Vec<std::net::SocketAddr>) {
use tokio::time::{timeout, Duration};
const CONNECT_TIMEOUT: Duration = Duration::from_secs(15);
let handles: Vec<_> = addrs
.iter()
.map(|addr| {
let net = network.clone();
let addr = *addr;
tokio::spawn(async move {
tracing::debug!("Connecting to peer: {}", addr);
match timeout(CONNECT_TIMEOUT, net.connect_addr(addr)).await {
Ok(Ok(_)) => {
tracing::info!("Connected to peer: {}", addr);
Ok(addr)
}
Ok(Err(e)) => {
tracing::warn!("Failed to connect to {}: {}", addr, e);
Err(addr)
}
Err(_) => {
tracing::warn!(
"Connection to {} timed out after {}s",
addr,
CONNECT_TIMEOUT.as_secs()
);
Err(addr)
}
}
})
})
.collect();
let mut succeeded = Vec::new();
let mut failed = Vec::new();
for handle in handles {
match handle.await {
Ok(Ok(addr)) => succeeded.push(addr),
Ok(Err(addr)) => failed.push(addr),
Err(e) => tracing::error!("Connection task panicked: {}", e),
}
}
(succeeded, failed)
}
pub async fn subscribe(&self, topic: &str) -> error::Result<Subscription> {
let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
error::IdentityError::Storage(std::io::Error::other(
"gossip runtime not initialized - configure agent with network first",
))
})?;
Ok(runtime.pubsub().subscribe(topic.to_string()).await)
}
pub async fn publish(&self, topic: &str, payload: Vec<u8>) -> error::Result<()> {
let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
error::IdentityError::Storage(std::io::Error::other(
"gossip runtime not initialized - configure agent with network first",
))
})?;
runtime
.pubsub()
.publish(topic.to_string(), bytes::Bytes::from(payload))
.await
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"publish failed: {}",
e
)))
})
}
pub async fn peers(&self) -> error::Result<Vec<saorsa_gossip_types::PeerId>> {
let network = self.network.as_ref().ok_or_else(|| {
error::IdentityError::Storage(std::io::Error::other(
"network not initialized - configure agent with network first",
))
})?;
let ant_peers = network.connected_peers().await;
Ok(ant_peers
.into_iter()
.map(|p| saorsa_gossip_types::PeerId::new(p.0))
.collect())
}
pub async fn presence(&self) -> error::Result<Vec<identity::AgentId>> {
self.start_identity_listener().await?;
let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
let mut agents: Vec<_> = self
.identity_discovery_cache
.read()
.await
.values()
.filter(|a| a.announced_at >= cutoff)
.map(|a| a.agent_id)
.collect();
agents.sort_by(|a, b| a.0.cmp(&b.0));
Ok(agents)
}
pub async fn subscribe_presence(
&self,
) -> error::NetworkResult<tokio::sync::broadcast::Receiver<presence::PresenceEvent>> {
let pw = self.presence.as_ref().ok_or_else(|| {
error::NetworkError::NodeError("presence system not initialized".to_string())
})?;
pw.start_event_loop(std::sync::Arc::clone(&self.identity_discovery_cache))
.await;
Ok(pw.subscribe_events())
}
pub async fn cached_agent(&self, id: &identity::AgentId) -> Option<DiscoveredAgent> {
self.identity_discovery_cache.read().await.get(id).cloned()
}
pub async fn discover_agents_foaf(
&self,
ttl: u8,
timeout_ms: u64,
) -> error::NetworkResult<Vec<DiscoveredAgent>> {
let pw = self.presence.as_ref().ok_or_else(|| {
error::NetworkError::NodeError("presence system not initialized".to_string())
})?;
let topic = presence::global_presence_topic();
let raw_results: Vec<(
saorsa_gossip_types::PeerId,
saorsa_gossip_types::PresenceRecord,
)> = pw
.manager()
.initiate_foaf_query(topic, ttl, timeout_ms)
.await
.map_err(|e| error::NetworkError::NodeError(e.to_string()))?;
let cache = self.identity_discovery_cache.read().await;
let mut seen: std::collections::HashSet<identity::AgentId> =
std::collections::HashSet::new();
let mut agents: Vec<DiscoveredAgent> = Vec::with_capacity(raw_results.len());
for (peer_id, record) in &raw_results {
if let Some(agent) =
presence::presence_record_to_discovered_agent(*peer_id, record, &cache)
{
if seen.insert(agent.agent_id) {
agents.push(agent);
}
}
}
Ok(agents)
}
pub async fn discover_agent_by_id(
&self,
target_id: identity::AgentId,
ttl: u8,
timeout_ms: u64,
) -> error::NetworkResult<Option<DiscoveredAgent>> {
{
let cache = self.identity_discovery_cache.read().await;
if let Some(agent) = cache.get(&target_id) {
return Ok(Some(agent.clone()));
}
}
let agents = self.discover_agents_foaf(ttl, timeout_ms).await?;
Ok(agents.into_iter().find(|a| a.agent_id == target_id))
}
pub async fn find_agent(
&self,
agent_id: identity::AgentId,
) -> error::Result<Option<Vec<std::net::SocketAddr>>> {
self.start_identity_listener().await?;
if let Some(addrs) = self
.identity_discovery_cache
.read()
.await
.get(&agent_id)
.map(|e| e.addresses.clone())
{
return Ok(Some(addrs));
}
let runtime = match self.gossip_runtime.as_ref() {
Some(r) => r,
None => return Ok(None),
};
let shard_topic = shard_topic_for_agent(&agent_id);
let mut sub = runtime.pubsub().subscribe(shard_topic).await;
let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
loop {
if tokio::time::Instant::now() >= deadline {
break;
}
let timeout = tokio::time::sleep_until(deadline);
tokio::select! {
Some(msg) = sub.recv() => {
if let Ok(ann) = {
use bincode::Options;
bincode::DefaultOptions::new()
.with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
.deserialize::<IdentityAnnouncement>(&msg.payload)
} {
if ann.verify().is_ok() && ann.agent_id == agent_id {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
let addrs = ann.addresses.clone();
cache.write().await.insert(
ann.agent_id,
DiscoveredAgent {
agent_id: ann.agent_id,
machine_id: ann.machine_id,
user_id: ann.user_id,
addresses: ann.addresses,
announced_at: ann.announced_at,
last_seen: now,
machine_public_key: ann.machine_public_key.clone(),
nat_type: ann.nat_type.clone(),
can_receive_direct: ann.can_receive_direct,
is_relay: ann.is_relay,
is_coordinator: ann.is_coordinator,
},
);
return Ok(Some(addrs));
}
}
}
_ = timeout => break,
}
}
if let Some(addrs) = self.find_agent_rendezvous(agent_id, 5).await? {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
cache
.write()
.await
.entry(agent_id)
.or_insert_with(|| DiscoveredAgent {
agent_id,
machine_id: identity::MachineId([0u8; 32]),
user_id: None,
addresses: addrs.clone(),
announced_at: now,
last_seen: now,
machine_public_key: Vec::new(),
nat_type: None,
can_receive_direct: None,
is_relay: None,
is_coordinator: None,
});
return Ok(Some(addrs));
}
Ok(None)
}
pub async fn find_agents_by_user(
&self,
user_id: identity::UserId,
) -> error::Result<Vec<DiscoveredAgent>> {
self.start_identity_listener().await?;
let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
Ok(self
.identity_discovery_cache
.read()
.await
.values()
.filter(|a| a.announced_at >= cutoff && a.user_id == Some(user_id))
.cloned()
.collect())
}
#[must_use]
pub fn local_addr(&self) -> Option<std::net::SocketAddr> {
self.network.as_ref().and_then(|n| n.local_addr())
}
pub async fn bound_addr(&self) -> Option<std::net::SocketAddr> {
if let Some(ref network) = self.network {
let addr = network.bound_addr().await;
match (addr, self.local_addr()) {
(Some(bound), Some(config)) if config.is_ipv4() && bound.is_ipv6() => {
Some(std::net::SocketAddr::new(config.ip(), bound.port()))
}
(Some(bound), _) => Some(bound),
_ => None,
}
} else {
None
}
}
pub fn build_announcement(
&self,
include_user: bool,
consent: bool,
) -> error::Result<IdentityAnnouncement> {
self.build_identity_announcement(include_user, consent)
}
fn start_direct_listener(&self) {
if self
.direct_listener_started
.swap(true, std::sync::atomic::Ordering::AcqRel)
{
return;
}
let Some(network) = self.network.as_ref().map(std::sync::Arc::clone) else {
return;
};
let dm = std::sync::Arc::clone(&self.direct_messaging);
tokio::spawn(async move {
tracing::info!("Direct message listener started");
loop {
let Some((ant_peer_id, payload)) = network.recv_direct().await else {
tracing::debug!("Direct message channel closed");
break;
};
if payload.len() < 32 {
tracing::warn!("Direct message too short ({} bytes)", payload.len());
continue;
}
let mut sender_bytes = [0u8; 32];
sender_bytes.copy_from_slice(&payload[..32]);
let sender = identity::AgentId(sender_bytes);
let machine_id = identity::MachineId(ant_peer_id.0);
let data = payload[32..].to_vec();
dm.register_agent(sender, machine_id).await;
dm.handle_incoming(machine_id, sender, data).await;
}
});
}
pub async fn start_identity_heartbeat(&self) -> error::Result<()> {
let mut handle_guard = self.heartbeat_handle.lock().await;
if handle_guard.is_some() {
return Ok(());
}
let Some(runtime) = self.gossip_runtime.as_ref().map(std::sync::Arc::clone) else {
return Err(error::IdentityError::Storage(std::io::Error::other(
"gossip runtime not initialized — cannot start heartbeat",
)));
};
let Some(network) = self.network.as_ref().map(std::sync::Arc::clone) else {
return Err(error::IdentityError::Storage(std::io::Error::other(
"network not initialized — cannot start heartbeat",
)));
};
let ctx = HeartbeatContext {
identity: std::sync::Arc::clone(&self.identity),
runtime,
network,
interval_secs: self.heartbeat_interval_secs,
cache: std::sync::Arc::clone(&self.identity_discovery_cache),
user_identity_consented: std::sync::Arc::clone(&self.user_identity_consented),
};
let handle = tokio::task::spawn(async move {
let mut ticker =
tokio::time::interval(std::time::Duration::from_secs(ctx.interval_secs));
ticker.tick().await; loop {
ticker.tick().await;
if let Err(e) = ctx.announce().await {
tracing::warn!("identity heartbeat announce failed: {e}");
}
}
});
*handle_guard = Some(handle);
Ok(())
}
pub async fn advertise_identity(&self, validity_ms: u64) -> error::Result<()> {
use saorsa_gossip_rendezvous::{Capability, ProviderSummary};
let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
error::IdentityError::Storage(std::io::Error::other(
"gossip runtime not initialized — cannot advertise identity",
))
})?;
let peer_id = runtime.peer_id();
let addresses = self.announcement_addresses();
let addr_bytes = bincode::serialize(&addresses).map_err(|e| {
error::IdentityError::Serialization(format!(
"failed to serialize addresses for rendezvous: {e}"
))
})?;
let mut summary = ProviderSummary::new(
self.agent_id().0,
peer_id,
vec![Capability::Identity],
validity_ms,
)
.with_extensions(addr_bytes);
summary
.sign_raw(self.identity.machine_keypair().secret_key().as_bytes())
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"failed to sign rendezvous summary: {e}"
)))
})?;
let cbor_bytes = summary.to_cbor().map_err(|e| {
error::IdentityError::Serialization(format!(
"failed to CBOR-encode rendezvous summary: {e}"
))
})?;
let topic = rendezvous_shard_topic_for_agent(&self.agent_id());
runtime
.pubsub()
.publish(topic, bytes::Bytes::from(cbor_bytes))
.await
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"failed to publish rendezvous summary: {e}"
)))
})?;
self.rendezvous_advertised
.store(true, std::sync::atomic::Ordering::Relaxed);
Ok(())
}
pub async fn find_agent_rendezvous(
&self,
agent_id: identity::AgentId,
timeout_secs: u64,
) -> error::Result<Option<Vec<std::net::SocketAddr>>> {
use saorsa_gossip_rendezvous::ProviderSummary;
let runtime = match self.gossip_runtime.as_ref() {
Some(r) => r,
None => return Ok(None),
};
let topic = rendezvous_shard_topic_for_agent(&agent_id);
let mut sub = runtime.pubsub().subscribe(topic).await;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
loop {
if tokio::time::Instant::now() >= deadline {
break;
}
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
tokio::select! {
Some(msg) = sub.recv() => {
let summary = match ProviderSummary::from_cbor(&msg.payload) {
Ok(s) => s,
Err(_) => continue,
};
if summary.target != agent_id.0 {
continue;
}
let cached_pub = self
.identity_discovery_cache
.read()
.await
.get(&agent_id)
.map(|e| e.machine_public_key.clone());
if let Some(pub_bytes) = cached_pub {
if !pub_bytes.is_empty()
&& !summary.verify_raw(&pub_bytes).unwrap_or(false)
{
tracing::warn!(
"Rendezvous summary signature verification failed for agent {:?}; discarding",
agent_id
);
continue;
}
}
let addrs: Vec<std::net::SocketAddr> = summary
.extensions
.as_deref()
.and_then(|b| {
use bincode::Options;
bincode::DefaultOptions::new()
.with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
.deserialize(b)
.ok()
})
.unwrap_or_default();
if !addrs.is_empty() {
return Ok(Some(addrs));
}
}
_ = tokio::time::sleep(remaining) => break,
}
}
Ok(None)
}
#[doc(hidden)]
pub async fn insert_discovered_agent_for_testing(&self, agent: DiscoveredAgent) {
self.identity_discovery_cache
.write()
.await
.insert(agent.agent_id, agent);
}
pub async fn create_task_list(&self, name: &str, topic: &str) -> error::Result<TaskListHandle> {
let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
error::IdentityError::Storage(std::io::Error::other(
"gossip runtime not initialized - configure agent with network first",
))
})?;
let peer_id = runtime.peer_id();
let list_id = crdt::TaskListId::from_content(name, &self.agent_id(), 0);
let task_list = crdt::TaskList::new(list_id, name.to_string(), peer_id);
let sync = crdt::TaskListSync::new(
task_list,
std::sync::Arc::clone(runtime.pubsub()),
topic.to_string(),
30,
)
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"task list sync creation failed: {}",
e
)))
})?;
let sync = std::sync::Arc::new(sync);
sync.start().await.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"task list sync start failed: {}",
e
)))
})?;
Ok(TaskListHandle {
sync,
agent_id: self.agent_id(),
peer_id,
})
}
pub async fn join_task_list(&self, topic: &str) -> error::Result<TaskListHandle> {
let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
error::IdentityError::Storage(std::io::Error::other(
"gossip runtime not initialized - configure agent with network first",
))
})?;
let peer_id = runtime.peer_id();
let list_id = crdt::TaskListId::from_content(topic, &self.agent_id(), 0);
let task_list = crdt::TaskList::new(list_id, String::new(), peer_id);
let sync = crdt::TaskListSync::new(
task_list,
std::sync::Arc::clone(runtime.pubsub()),
topic.to_string(),
30,
)
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"task list sync creation failed: {}",
e
)))
})?;
let sync = std::sync::Arc::new(sync);
sync.start().await.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"task list sync start failed: {}",
e
)))
})?;
Ok(TaskListHandle {
sync,
agent_id: self.agent_id(),
peer_id,
})
}
}
impl AgentBuilder {
pub fn with_machine_key<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
self.machine_key_path = Some(path.as_ref().to_path_buf());
self
}
pub fn with_agent_key(mut self, keypair: identity::AgentKeypair) -> Self {
self.agent_keypair = Some(keypair);
self
}
pub fn with_agent_key_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
self.agent_key_path = Some(path.as_ref().to_path_buf());
self
}
pub fn with_network_config(mut self, config: network::NetworkConfig) -> Self {
self.network_config = Some(config);
self
}
pub fn with_peer_cache_dir<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
self.peer_cache_dir = Some(path.as_ref().to_path_buf());
self
}
pub fn with_peer_cache_disabled(mut self) -> Self {
self.disable_peer_cache = true;
self
}
pub fn with_user_key(mut self, keypair: identity::UserKeypair) -> Self {
self.user_keypair = Some(keypair);
self
}
pub fn with_user_key_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
self.user_key_path = Some(path.as_ref().to_path_buf());
self
}
#[must_use]
pub fn with_heartbeat_interval(mut self, secs: u64) -> Self {
self.heartbeat_interval_secs = Some(secs);
self
}
#[must_use]
pub fn with_identity_ttl(mut self, secs: u64) -> Self {
self.identity_ttl_secs = Some(secs);
self
}
#[must_use]
pub fn with_contact_store_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
self.contact_store_path = Some(path.as_ref().to_path_buf());
self
}
pub async fn build(self) -> error::Result<Agent> {
let machine_keypair = if let Some(path) = self.machine_key_path {
match storage::load_machine_keypair_from(&path).await {
Ok(kp) => kp,
Err(_) => {
let kp = identity::MachineKeypair::generate()?;
storage::save_machine_keypair_to(&kp, &path).await?;
kp
}
}
} else if storage::machine_keypair_exists().await {
storage::load_machine_keypair().await?
} else {
let kp = identity::MachineKeypair::generate()?;
storage::save_machine_keypair(&kp).await?;
kp
};
let agent_keypair = if let Some(kp) = self.agent_keypair {
kp
} else if let Some(path) = self.agent_key_path {
match storage::load_agent_keypair_from(&path).await {
Ok(kp) => kp,
Err(_) => {
let kp = identity::AgentKeypair::generate()?;
storage::save_agent_keypair_to(&kp, &path).await?;
kp
}
}
} else if storage::agent_keypair_exists().await {
storage::load_agent_keypair_default().await?
} else {
let kp = identity::AgentKeypair::generate()?;
storage::save_agent_keypair_default(&kp).await?;
kp
};
let user_keypair = if let Some(kp) = self.user_keypair {
Some(kp)
} else if let Some(path) = self.user_key_path {
storage::load_user_keypair_from(&path).await.ok()
} else if storage::user_keypair_exists().await {
storage::load_user_keypair().await.ok()
} else {
None
};
let identity = if let Some(user_kp) = user_keypair {
let cert = if storage::agent_certificate_exists().await {
match storage::load_agent_certificate().await {
Ok(c) => {
let cert_matches_user = c
.user_id()
.map(|uid| uid == user_kp.user_id())
.unwrap_or(false);
if cert_matches_user {
c
} else {
let new_cert =
identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
storage::save_agent_certificate(&new_cert).await?;
new_cert
}
}
Err(_) => {
let c = identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
storage::save_agent_certificate(&c).await?;
c
}
}
} else {
let c = identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
storage::save_agent_certificate(&c).await?;
c
};
identity::Identity::new_with_user(machine_keypair, agent_keypair, user_kp, cert)
} else {
identity::Identity::new(machine_keypair, agent_keypair)
};
let bootstrap_cache = if self.network_config.is_some() && !self.disable_peer_cache {
let cache_dir = self.peer_cache_dir.unwrap_or_else(|| {
dirs::home_dir()
.unwrap_or_else(|| std::path::PathBuf::from("."))
.join(".x0x")
.join("peers")
});
let config = ant_quic::BootstrapCacheConfig::builder()
.cache_dir(cache_dir)
.min_peers_to_save(1)
.build();
match ant_quic::BootstrapCache::open(config).await {
Ok(cache) => {
let cache = std::sync::Arc::new(cache);
std::sync::Arc::clone(&cache).start_maintenance();
Some(cache)
}
Err(e) => {
tracing::warn!("Failed to open bootstrap cache: {e}");
None
}
}
} else {
None
};
let machine_keypair = {
let pk = ant_quic::MlDsaPublicKey::from_bytes(
identity.machine_keypair().public_key().as_bytes(),
)
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"invalid machine public key: {e}"
)))
})?;
let sk = ant_quic::MlDsaSecretKey::from_bytes(
identity.machine_keypair().secret_key().as_bytes(),
)
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"invalid machine secret key: {e}"
)))
})?;
Some((pk, sk))
};
let network = if let Some(config) = self.network_config {
let node = network::NetworkNode::new(config, bootstrap_cache.clone(), machine_keypair)
.await
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"network initialization failed: {}",
e
)))
})?;
debug_assert_eq!(
node.peer_id().0,
identity.machine_id().0,
"ant-quic PeerId must equal MachineId after identity unification"
);
Some(std::sync::Arc::new(node))
} else {
None
};
let signing_ctx = std::sync::Arc::new(gossip::SigningContext::from_keypair(
identity.agent_keypair(),
));
let gossip_runtime = if let Some(ref net) = network {
let runtime = gossip::GossipRuntime::new(
gossip::GossipConfig::default(),
std::sync::Arc::clone(net),
Some(signing_ctx),
)
.await
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"gossip runtime initialization failed: {}",
e
)))
})?;
Some(std::sync::Arc::new(runtime))
} else {
None
};
let contacts_path = self.contact_store_path.unwrap_or_else(|| {
dirs::home_dir()
.unwrap_or_else(|| std::path::PathBuf::from("."))
.join(".x0x")
.join("contacts.json")
});
let contact_store = std::sync::Arc::new(tokio::sync::RwLock::new(
contacts::ContactStore::new(contacts_path),
));
let gossip_cache_adapter = bootstrap_cache.as_ref().map(|cache| {
saorsa_gossip_coordinator::GossipCacheAdapter::new(std::sync::Arc::clone(cache))
});
let direct_messaging = std::sync::Arc::new(direct::DirectMessaging::new());
let presence = if let Some(ref net) = network {
let peer_id = saorsa_gossip_transport::GossipTransport::local_peer_id(net.as_ref());
let pw = presence::PresenceWrapper::new(
peer_id,
std::sync::Arc::clone(net),
presence::PresenceConfig::default(),
bootstrap_cache.clone(),
)
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"presence initialization failed: {}",
e
)))
})?;
let pw_arc = std::sync::Arc::new(pw);
if let Some(ref rt) = gossip_runtime {
rt.set_presence(std::sync::Arc::clone(&pw_arc));
}
Some(pw_arc)
} else {
None
};
Ok(Agent {
identity: std::sync::Arc::new(identity),
network,
gossip_runtime,
bootstrap_cache,
gossip_cache_adapter,
identity_discovery_cache: std::sync::Arc::new(tokio::sync::RwLock::new(
std::collections::HashMap::new(),
)),
identity_listener_started: std::sync::atomic::AtomicBool::new(false),
heartbeat_interval_secs: self
.heartbeat_interval_secs
.unwrap_or(IDENTITY_HEARTBEAT_INTERVAL_SECS),
identity_ttl_secs: self.identity_ttl_secs.unwrap_or(IDENTITY_TTL_SECS),
heartbeat_handle: tokio::sync::Mutex::new(None),
rendezvous_advertised: std::sync::atomic::AtomicBool::new(false),
contact_store,
direct_messaging,
direct_listener_started: std::sync::atomic::AtomicBool::new(false),
presence,
user_identity_consented: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
})
}
}
#[derive(Clone)]
pub struct TaskListHandle {
sync: std::sync::Arc<crdt::TaskListSync>,
agent_id: identity::AgentId,
peer_id: saorsa_gossip_types::PeerId,
}
impl std::fmt::Debug for TaskListHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TaskListHandle")
.field("agent_id", &self.agent_id)
.field("peer_id", &self.peer_id)
.finish_non_exhaustive()
}
}
impl TaskListHandle {
pub async fn add_task(
&self,
title: String,
description: String,
) -> error::Result<crdt::TaskId> {
let (task_id, delta) = {
let mut list = self.sync.write().await;
let seq = list.next_seq();
let task_id = crdt::TaskId::new(&title, &self.agent_id, seq);
let metadata = crdt::TaskMetadata::new(title, description, 128, self.agent_id, seq);
let task = crdt::TaskItem::new(task_id, metadata, self.peer_id);
list.add_task(task.clone(), self.peer_id, seq)
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"add_task failed: {}",
e
)))
})?;
let tag = (self.peer_id, seq);
let delta = crdt::TaskListDelta::for_add(task_id, task, tag, list.current_version());
(task_id, delta)
};
if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
tracing::warn!("failed to publish add_task delta: {}", e);
}
Ok(task_id)
}
pub async fn claim_task(&self, task_id: crdt::TaskId) -> error::Result<()> {
let delta = {
let mut list = self.sync.write().await;
let seq = list.next_seq();
list.claim_task(&task_id, self.agent_id, self.peer_id, seq)
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"claim_task failed: {}",
e
)))
})?;
let full_task = list
.get_task(&task_id)
.ok_or_else(|| {
error::IdentityError::Storage(std::io::Error::other(
"task disappeared after claim",
))
})?
.clone();
crdt::TaskListDelta::for_state_change(task_id, full_task, list.current_version())
};
if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
tracing::warn!("failed to publish claim_task delta: {}", e);
}
Ok(())
}
pub async fn complete_task(&self, task_id: crdt::TaskId) -> error::Result<()> {
let delta = {
let mut list = self.sync.write().await;
let seq = list.next_seq();
list.complete_task(&task_id, self.agent_id, self.peer_id, seq)
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"complete_task failed: {}",
e
)))
})?;
let full_task = list
.get_task(&task_id)
.ok_or_else(|| {
error::IdentityError::Storage(std::io::Error::other(
"task disappeared after complete",
))
})?
.clone();
crdt::TaskListDelta::for_state_change(task_id, full_task, list.current_version())
};
if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
tracing::warn!("failed to publish complete_task delta: {}", e);
}
Ok(())
}
pub async fn list_tasks(&self) -> error::Result<Vec<TaskSnapshot>> {
let list = self.sync.read().await;
let tasks = list.tasks_ordered();
Ok(tasks
.into_iter()
.map(|task| TaskSnapshot {
id: *task.id(),
title: task.title().to_string(),
description: task.description().to_string(),
state: task.current_state(),
assignee: task.assignee().copied(),
owner: None,
priority: task.priority(),
})
.collect())
}
pub async fn reorder(&self, task_ids: Vec<crdt::TaskId>) -> error::Result<()> {
let delta = {
let mut list = self.sync.write().await;
list.reorder(task_ids.clone(), self.peer_id).map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"reorder failed: {}",
e
)))
})?;
crdt::TaskListDelta::for_reorder(task_ids, list.current_version())
};
if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
tracing::warn!("failed to publish reorder delta: {}", e);
}
Ok(())
}
}
impl Agent {
pub async fn create_kv_store(&self, name: &str, topic: &str) -> error::Result<KvStoreHandle> {
let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
error::IdentityError::Storage(std::io::Error::other(
"gossip runtime not initialized - configure agent with network first",
))
})?;
let peer_id = runtime.peer_id();
let store_id = kv::KvStoreId::from_content(name, &self.agent_id());
let store = kv::KvStore::new(
store_id,
name.to_string(),
self.agent_id(),
kv::AccessPolicy::Signed,
);
let sync = kv::KvStoreSync::new(
store,
std::sync::Arc::clone(runtime.pubsub()),
topic.to_string(),
30,
)
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"kv store sync creation failed: {e}",
)))
})?;
let sync = std::sync::Arc::new(sync);
sync.start().await.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"kv store sync start failed: {e}",
)))
})?;
Ok(KvStoreHandle {
sync,
agent_id: self.agent_id(),
peer_id,
})
}
pub async fn join_kv_store(&self, topic: &str) -> error::Result<KvStoreHandle> {
let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
error::IdentityError::Storage(std::io::Error::other(
"gossip runtime not initialized - configure agent with network first",
))
})?;
let peer_id = runtime.peer_id();
let store_id = kv::KvStoreId::from_content(topic, &self.agent_id());
let store = kv::KvStore::new(
store_id,
String::new(),
self.agent_id(),
kv::AccessPolicy::Encrypted {
group_id: Vec::new(),
},
);
let sync = kv::KvStoreSync::new(
store,
std::sync::Arc::clone(runtime.pubsub()),
topic.to_string(),
30,
)
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"kv store sync creation failed: {e}",
)))
})?;
let sync = std::sync::Arc::new(sync);
sync.start().await.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"kv store sync start failed: {e}",
)))
})?;
Ok(KvStoreHandle {
sync,
agent_id: self.agent_id(),
peer_id,
})
}
}
#[derive(Clone)]
pub struct KvStoreHandle {
sync: std::sync::Arc<kv::KvStoreSync>,
agent_id: identity::AgentId,
peer_id: saorsa_gossip_types::PeerId,
}
impl std::fmt::Debug for KvStoreHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KvStoreHandle")
.field("agent_id", &self.agent_id)
.field("peer_id", &self.peer_id)
.finish_non_exhaustive()
}
}
impl KvStoreHandle {
pub async fn put(
&self,
key: String,
value: Vec<u8>,
content_type: String,
) -> error::Result<()> {
let delta = {
let mut store = self.sync.write().await;
store
.put(
key.clone(),
value.clone(),
content_type.clone(),
self.peer_id,
)
.map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"kv put failed: {e}",
)))
})?;
let entry = store.get(&key).cloned();
let version = store.current_version();
match entry {
Some(e) => {
kv::KvStoreDelta::for_put(key, e, (self.peer_id, store.next_seq()), version)
}
None => return Ok(()), }
};
if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
tracing::warn!("failed to publish kv put delta: {e}");
}
Ok(())
}
pub async fn get(&self, key: &str) -> error::Result<Option<KvEntrySnapshot>> {
let store = self.sync.read().await;
Ok(store.get(key).map(|e| KvEntrySnapshot {
key: e.key.clone(),
value: e.value.clone(),
content_hash: hex::encode(e.content_hash),
content_type: e.content_type.clone(),
metadata: e.metadata.clone(),
created_at: e.created_at,
updated_at: e.updated_at,
}))
}
pub async fn remove(&self, key: &str) -> error::Result<()> {
let delta = {
let mut store = self.sync.write().await;
store.remove(key).map_err(|e| {
error::IdentityError::Storage(std::io::Error::other(format!(
"kv remove failed: {e}",
)))
})?;
let mut d = kv::KvStoreDelta::new(store.current_version());
d.removed
.insert(key.to_string(), std::collections::HashSet::new());
d
};
if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
tracing::warn!("failed to publish kv remove delta: {e}");
}
Ok(())
}
pub async fn keys(&self) -> error::Result<Vec<KvEntrySnapshot>> {
let store = self.sync.read().await;
Ok(store
.active_entries()
.into_iter()
.map(|e| KvEntrySnapshot {
key: e.key.clone(),
value: e.value.clone(),
content_hash: hex::encode(e.content_hash),
content_type: e.content_type.clone(),
metadata: e.metadata.clone(),
created_at: e.created_at,
updated_at: e.updated_at,
})
.collect())
}
pub async fn name(&self) -> error::Result<String> {
let store = self.sync.read().await;
Ok(store.name().to_string())
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct KvEntrySnapshot {
pub key: String,
pub value: Vec<u8>,
pub content_hash: String,
pub content_type: String,
pub metadata: std::collections::HashMap<String, String>,
pub created_at: u64,
pub updated_at: u64,
}
#[derive(Debug, Clone)]
pub struct TaskSnapshot {
pub id: crdt::TaskId,
pub title: String,
pub description: String,
pub state: crdt::CheckboxState,
pub assignee: Option<identity::AgentId>,
pub owner: Option<identity::UserId>,
pub priority: u8,
}
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
pub const NAME: &str = "x0x";
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn name_is_palindrome() {
let name = NAME;
let reversed: String = name.chars().rev().collect();
assert_eq!(name, reversed, "x0x must be a palindrome");
}
#[test]
fn name_is_three_bytes() {
assert_eq!(NAME.len(), 3, "x0x must be exactly three bytes");
}
#[test]
fn name_is_ai_native() {
assert!(NAME.chars().all(|c| c.is_ascii_alphanumeric()));
}
#[tokio::test]
async fn agent_creates() {
let agent = Agent::new().await;
assert!(agent.is_ok());
}
#[tokio::test]
async fn agent_joins_network() {
let agent = Agent::new().await.unwrap();
assert!(agent.join_network().await.is_ok());
}
#[tokio::test]
async fn agent_subscribes() {
let agent = Agent::new().await.unwrap();
assert!(agent.subscribe("test-topic").await.is_err());
}
#[tokio::test]
async fn identity_announcement_machine_signature_verifies() {
let agent = Agent::builder()
.with_network_config(network::NetworkConfig::default())
.build()
.await
.unwrap();
let announcement = agent.build_identity_announcement(false, false).unwrap();
assert_eq!(announcement.agent_id, agent.agent_id());
assert_eq!(announcement.machine_id, agent.machine_id());
assert!(announcement.user_id.is_none());
assert!(announcement.agent_certificate.is_none());
assert!(announcement.verify().is_ok());
}
#[tokio::test]
async fn identity_announcement_requires_human_consent() {
let agent = Agent::builder()
.with_network_config(network::NetworkConfig::default())
.build()
.await
.unwrap();
let err = agent.build_identity_announcement(true, false).unwrap_err();
assert!(
err.to_string().contains("explicit human consent"),
"unexpected error: {err}"
);
}
#[tokio::test]
async fn identity_announcement_with_user_requires_user_identity() {
let agent = Agent::builder()
.with_network_config(network::NetworkConfig::default())
.build()
.await
.unwrap();
let err = agent.build_identity_announcement(true, true).unwrap_err();
assert!(
err.to_string().contains("no user identity is configured"),
"unexpected error: {err}"
);
}
#[tokio::test]
async fn announce_identity_populates_discovery_cache() {
let user_key = identity::UserKeypair::generate().unwrap();
let agent = Agent::builder()
.with_network_config(network::NetworkConfig::default())
.with_user_key(user_key)
.build()
.await
.unwrap();
agent.announce_identity(true, true).await.unwrap();
let discovered = agent.discovered_agent(agent.agent_id()).await.unwrap();
let entry = discovered.expect("agent should discover its own announcement");
assert_eq!(entry.agent_id, agent.agent_id());
assert_eq!(entry.machine_id, agent.machine_id());
assert_eq!(entry.user_id, agent.user_id());
}
#[test]
fn identity_announcement_backward_compat_no_nat_fields() {
use identity::{AgentId, MachineId};
#[derive(serde::Serialize, serde::Deserialize)]
struct OldIdentityAnnouncementUnsigned {
agent_id: AgentId,
machine_id: MachineId,
user_id: Option<identity::UserId>,
agent_certificate: Option<identity::AgentCertificate>,
machine_public_key: Vec<u8>,
addresses: Vec<std::net::SocketAddr>,
announced_at: u64,
}
let agent_id = AgentId([1u8; 32]);
let machine_id = MachineId([2u8; 32]);
let old = OldIdentityAnnouncementUnsigned {
agent_id,
machine_id,
user_id: None,
agent_certificate: None,
machine_public_key: vec![0u8; 10],
addresses: Vec::new(),
announced_at: 1234,
};
let bytes = bincode::serialize(&old).expect("serialize old announcement");
let result = bincode::deserialize::<IdentityAnnouncementUnsigned>(&bytes);
assert!(
result.is_err(),
"Old-format announcement should not decode as new struct (protocol upgrade required)"
);
}
#[test]
fn identity_announcement_nat_fields_round_trip() {
use identity::{AgentId, MachineId};
let unsigned = IdentityAnnouncementUnsigned {
agent_id: AgentId([1u8; 32]),
machine_id: MachineId([2u8; 32]),
user_id: None,
agent_certificate: None,
machine_public_key: vec![0u8; 10],
addresses: Vec::new(),
announced_at: 9999,
nat_type: Some("FullCone".to_string()),
can_receive_direct: Some(true),
is_relay: Some(false),
is_coordinator: Some(true),
};
let bytes = bincode::serialize(&unsigned).expect("serialize");
let decoded: IdentityAnnouncementUnsigned =
bincode::deserialize(&bytes).expect("deserialize");
assert_eq!(decoded.nat_type.as_deref(), Some("FullCone"));
assert_eq!(decoded.can_receive_direct, Some(true));
assert_eq!(decoded.is_relay, Some(false));
assert_eq!(decoded.is_coordinator, Some(true));
}
#[test]
fn identity_announcement_no_nat_fields_round_trip() {
use identity::{AgentId, MachineId};
let unsigned = IdentityAnnouncementUnsigned {
agent_id: AgentId([3u8; 32]),
machine_id: MachineId([4u8; 32]),
user_id: None,
agent_certificate: None,
machine_public_key: vec![0u8; 10],
addresses: Vec::new(),
announced_at: 42,
nat_type: None,
can_receive_direct: None,
is_relay: None,
is_coordinator: None,
};
let bytes = bincode::serialize(&unsigned).expect("serialize");
let decoded: IdentityAnnouncementUnsigned =
bincode::deserialize(&bytes).expect("deserialize");
assert!(decoded.nat_type.is_none());
assert!(decoded.can_receive_direct.is_none());
assert!(decoded.is_relay.is_none());
assert!(decoded.is_coordinator.is_none());
}
}