1#![allow(clippy::unwrap_used)]
2#![allow(clippy::expect_used)]
3#![allow(missing_docs)]
4
5pub mod error;
55
56pub mod identity;
62
63pub mod storage;
68
69pub mod bootstrap;
74pub mod network;
76
77pub mod contacts;
79
80pub mod trust;
85
86pub mod connectivity;
91
92pub mod gossip;
94
95pub mod crdt;
97
98pub mod kv;
100
101pub mod groups;
103
104pub mod mls;
106
107pub mod direct;
112
113pub mod dm;
117
118pub mod dm_capability;
122
123pub mod dm_capability_service;
126
127pub mod dm_inbox;
131
132pub mod dm_send;
135
136pub mod presence;
138
139pub mod upgrade;
141
142pub mod files;
144
145pub mod constitution;
147
148pub mod api;
150
151pub mod cli;
153
154pub use gossip::{
156 GossipConfig, GossipRuntime, PubSubManager, PubSubMessage, PubSubStats, PubSubStatsSnapshot,
157 SigningContext, Subscription,
158};
159
160pub use direct::{DirectMessage, DirectMessageReceiver, DirectMessaging};
162
163use saorsa_gossip_membership::Membership as _;
165
166pub struct Agent {
188 identity: std::sync::Arc<identity::Identity>,
189 #[allow(dead_code)]
191 network: Option<std::sync::Arc<network::NetworkNode>>,
192 gossip_runtime: Option<std::sync::Arc<gossip::GossipRuntime>>,
194 bootstrap_cache: Option<std::sync::Arc<ant_quic::BootstrapCache>>,
196 gossip_cache_adapter: Option<saorsa_gossip_coordinator::GossipCacheAdapter>,
198 identity_discovery_cache: std::sync::Arc<
200 tokio::sync::RwLock<std::collections::HashMap<identity::AgentId, DiscoveredAgent>>,
201 >,
202 identity_listener_started: std::sync::atomic::AtomicBool,
204 heartbeat_interval_secs: u64,
206 identity_ttl_secs: u64,
208 heartbeat_handle: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
210 rendezvous_advertised: std::sync::atomic::AtomicBool,
212 contact_store: std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>>,
214 direct_messaging: std::sync::Arc<direct::DirectMessaging>,
216 network_event_listener_started: std::sync::atomic::AtomicBool,
218 direct_listener_started: std::sync::atomic::AtomicBool,
220 presence: Option<std::sync::Arc<presence::PresenceWrapper>>,
222 user_identity_consented: std::sync::Arc<std::sync::atomic::AtomicBool>,
226 capability_store: std::sync::Arc<dm_capability::CapabilityStore>,
229 dm_capabilities_tx: std::sync::Arc<tokio::sync::watch::Sender<dm::DmCapabilities>>,
234 dm_inflight_acks: std::sync::Arc<dm::InFlightAcks>,
236 recent_delivery_cache: std::sync::Arc<dm::RecentDeliveryCache>,
238 capability_advert_service:
240 tokio::sync::Mutex<Option<dm_capability_service::CapabilityAdvertService>>,
241 dm_inbox_service: tokio::sync::Mutex<Option<dm_inbox::DmInboxService>>,
243}
244
245impl std::fmt::Debug for Agent {
246 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247 f.debug_struct("Agent")
248 .field("identity", &self.identity)
249 .field("network", &self.network.is_some())
250 .field("gossip_runtime", &self.gossip_runtime.is_some())
251 .field("bootstrap_cache", &self.bootstrap_cache.is_some())
252 .field("gossip_cache_adapter", &self.gossip_cache_adapter.is_some())
253 .finish()
254 }
255}
256
257#[derive(Debug, Clone)]
259pub struct Message {
260 pub origin: String,
262 pub payload: Vec<u8>,
264 pub topic: String,
266}
267
268pub const IDENTITY_ANNOUNCE_TOPIC: &str = "x0x.identity.announce.v1";
270
271#[must_use]
281pub fn shard_topic_for_agent(agent_id: &identity::AgentId) -> String {
282 let shard = saorsa_gossip_rendezvous::calculate_shard(&agent_id.0);
283 format!("x0x.identity.shard.{shard}")
284}
285
286pub const RENDEZVOUS_SHARD_TOPIC_PREFIX: &str = "x0x.rendezvous.shard";
288
289#[must_use]
295pub fn rendezvous_shard_topic_for_agent(agent_id: &identity::AgentId) -> String {
296 let shard = saorsa_gossip_rendezvous::calculate_shard(&agent_id.0);
297 format!("{RENDEZVOUS_SHARD_TOPIC_PREFIX}.{shard}")
298}
299
300fn is_globally_routable(ip: std::net::IpAddr) -> bool {
307 match ip {
308 std::net::IpAddr::V4(v4) => {
309 !v4.is_private() && !v4.is_loopback() && !v4.is_link_local() && !v4.is_unspecified() && !v4.is_broadcast() && !v4.is_documentation() && !(v4.octets()[0] == 100 && (v4.octets()[1] & 0xC0) == 64)
317 }
318 std::net::IpAddr::V6(v6) => {
319 let segs = v6.segments();
320 !v6.is_loopback() && !v6.is_unspecified() && (segs[0] & 0xffc0) != 0xfe80 && (segs[0] & 0xfe00) != 0xfc00 && (segs[0] & 0xfff0) != 0xfec0 }
326 }
327}
328
329pub fn is_publicly_advertisable(addr: std::net::SocketAddr) -> bool {
341 addr.port() > 0 && is_globally_routable(addr.ip())
342}
343
344pub fn collect_local_interface_addrs(port: u16) -> Vec<std::net::SocketAddr> {
345 fn is_cgnat(v4: std::net::Ipv4Addr) -> bool {
346 v4.octets()[0] == 100 && (v4.octets()[1] & 0xC0) == 64
347 }
348
349 fn addr_priority(ip: std::net::IpAddr) -> u8 {
350 match ip {
351 std::net::IpAddr::V4(v4) => {
352 if is_globally_routable(std::net::IpAddr::V4(v4)) {
353 0
354 } else if is_cgnat(v4) {
355 1
356 } else {
357 2
358 }
359 }
360 std::net::IpAddr::V6(v6) => {
361 if is_globally_routable(std::net::IpAddr::V6(v6)) {
362 3
363 } else {
364 4
365 }
366 }
367 }
368 }
369
370 let mut ranked = Vec::new();
371
372 let interfaces = match if_addrs::get_if_addrs() {
373 Ok(interfaces) => interfaces,
374 Err(_) => return Vec::new(),
375 };
376
377 for iface in interfaces {
378 let ip = iface.ip();
379 if ip.is_unspecified() || ip.is_loopback() {
380 continue;
381 }
382
383 let addr = match ip {
384 std::net::IpAddr::V4(v4) => {
385 if v4.is_link_local() {
386 continue;
387 }
388 std::net::SocketAddr::new(std::net::IpAddr::V4(v4), port)
389 }
390 std::net::IpAddr::V6(v6) => {
391 let segs = v6.segments();
392 let is_link_local = (segs[0] & 0xffc0) == 0xfe80;
393 if is_link_local {
394 continue;
395 }
396 std::net::SocketAddr::new(std::net::IpAddr::V6(v6), port)
397 }
398 };
399
400 if !ranked.iter().any(|(_, existing)| *existing == addr) {
401 ranked.push((addr_priority(addr.ip()), addr));
402 }
403 }
404
405 ranked.sort_by_key(|(priority, addr)| (*priority, addr.is_ipv6()));
406 ranked.into_iter().map(|(_, addr)| addr).collect()
407}
408
409pub const IDENTITY_HEARTBEAT_INTERVAL_SECS: u64 = 60;
420
421pub const IDENTITY_TTL_SECS: u64 = 900;
426
427#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
428struct IdentityAnnouncementUnsigned {
429 agent_id: identity::AgentId,
430 machine_id: identity::MachineId,
431 user_id: Option<identity::UserId>,
432 agent_certificate: Option<identity::AgentCertificate>,
433 machine_public_key: Vec<u8>,
434 addresses: Vec<std::net::SocketAddr>,
435 announced_at: u64,
436 nat_type: Option<String>,
438 can_receive_direct: Option<bool>,
440 is_relay: Option<bool>,
442 is_coordinator: Option<bool>,
444}
445
446#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
451pub struct IdentityAnnouncement {
452 pub agent_id: identity::AgentId,
454 pub machine_id: identity::MachineId,
456 pub user_id: Option<identity::UserId>,
458 pub agent_certificate: Option<identity::AgentCertificate>,
460 pub machine_public_key: Vec<u8>,
462 pub machine_signature: Vec<u8>,
464 pub addresses: Vec<std::net::SocketAddr>,
466 pub announced_at: u64,
468 pub nat_type: Option<String>,
471 pub can_receive_direct: Option<bool>,
474 pub is_relay: Option<bool>,
477 pub is_coordinator: Option<bool>,
480}
481
482impl IdentityAnnouncement {
483 fn to_unsigned(&self) -> IdentityAnnouncementUnsigned {
484 IdentityAnnouncementUnsigned {
485 agent_id: self.agent_id,
486 machine_id: self.machine_id,
487 user_id: self.user_id,
488 agent_certificate: self.agent_certificate.clone(),
489 machine_public_key: self.machine_public_key.clone(),
490 addresses: self.addresses.clone(),
491 announced_at: self.announced_at,
492 nat_type: self.nat_type.clone(),
493 can_receive_direct: self.can_receive_direct,
494 is_relay: self.is_relay,
495 is_coordinator: self.is_coordinator,
496 }
497 }
498
499 pub fn verify(&self) -> error::Result<()> {
501 let machine_pub =
502 ant_quic::MlDsaPublicKey::from_bytes(&self.machine_public_key).map_err(|_| {
503 error::IdentityError::CertificateVerification(
504 "invalid machine public key in announcement".to_string(),
505 )
506 })?;
507 let derived_machine_id = identity::MachineId::from_public_key(&machine_pub);
508 if derived_machine_id != self.machine_id {
509 return Err(error::IdentityError::CertificateVerification(
510 "machine_id does not match machine public key".to_string(),
511 ));
512 }
513
514 let unsigned_bytes = bincode::serialize(&self.to_unsigned()).map_err(|e| {
515 error::IdentityError::Serialization(format!(
516 "failed to serialize announcement for verification: {e}"
517 ))
518 })?;
519 let signature = ant_quic::crypto::raw_public_keys::pqc::MlDsaSignature::from_bytes(
520 &self.machine_signature,
521 )
522 .map_err(|e| {
523 error::IdentityError::CertificateVerification(format!(
524 "invalid machine signature in announcement: {:?}",
525 e
526 ))
527 })?;
528 ant_quic::crypto::raw_public_keys::pqc::verify_with_ml_dsa(
529 &machine_pub,
530 &unsigned_bytes,
531 &signature,
532 )
533 .map_err(|e| {
534 error::IdentityError::CertificateVerification(format!(
535 "machine signature verification failed: {:?}",
536 e
537 ))
538 })?;
539
540 match (self.user_id, self.agent_certificate.as_ref()) {
541 (Some(user_id), Some(cert)) => {
542 cert.verify()?;
543 let cert_agent_id = cert.agent_id()?;
544 if cert_agent_id != self.agent_id {
545 return Err(error::IdentityError::CertificateVerification(
546 "agent certificate agent_id mismatch".to_string(),
547 ));
548 }
549 let cert_user_id = cert.user_id()?;
550 if cert_user_id != user_id {
551 return Err(error::IdentityError::CertificateVerification(
552 "agent certificate user_id mismatch".to_string(),
553 ));
554 }
555 Ok(())
556 }
557 (None, None) => Ok(()),
558 _ => Err(error::IdentityError::CertificateVerification(
559 "user identity disclosure requires matching certificate".to_string(),
560 )),
561 }
562 }
563}
564
565#[derive(Debug, Clone)]
567pub struct DiscoveredAgent {
568 pub agent_id: identity::AgentId,
570 pub machine_id: identity::MachineId,
572 pub user_id: Option<identity::UserId>,
574 pub addresses: Vec<std::net::SocketAddr>,
576 pub announced_at: u64,
578 pub last_seen: u64,
580 #[doc(hidden)]
585 pub machine_public_key: Vec<u8>,
586 pub nat_type: Option<String>,
589 pub can_receive_direct: Option<bool>,
592 pub is_relay: Option<bool>,
595 pub is_coordinator: Option<bool>,
598}
599
600#[derive(Debug)]
637pub struct AgentBuilder {
638 machine_key_path: Option<std::path::PathBuf>,
639 agent_keypair: Option<identity::AgentKeypair>,
640 agent_key_path: Option<std::path::PathBuf>,
641 agent_cert_path: Option<std::path::PathBuf>,
648 user_keypair: Option<identity::UserKeypair>,
649 user_key_path: Option<std::path::PathBuf>,
650 #[allow(dead_code)]
651 network_config: Option<network::NetworkConfig>,
652 peer_cache_dir: Option<std::path::PathBuf>,
653 disable_peer_cache: bool,
656 heartbeat_interval_secs: Option<u64>,
657 identity_ttl_secs: Option<u64>,
658 presence_beacon_interval_secs: Option<u64>,
659 presence_event_poll_interval_secs: Option<u64>,
660 presence_offline_timeout_secs: Option<u64>,
661 contact_store_path: Option<std::path::PathBuf>,
663}
664
665struct HeartbeatContext {
667 identity: std::sync::Arc<identity::Identity>,
668 runtime: std::sync::Arc<gossip::GossipRuntime>,
669 network: std::sync::Arc<network::NetworkNode>,
670 interval_secs: u64,
671 cache: std::sync::Arc<
672 tokio::sync::RwLock<std::collections::HashMap<identity::AgentId, DiscoveredAgent>>,
673 >,
674 user_identity_consented: std::sync::Arc<std::sync::atomic::AtomicBool>,
678}
679
680impl HeartbeatContext {
681 async fn announce(&self) -> error::Result<()> {
682 let machine_public_key = self
683 .identity
684 .machine_keypair()
685 .public_key()
686 .as_bytes()
687 .to_vec();
688 let announced_at = Agent::unix_timestamp_secs();
689
690 let mut addresses = match self.network.node_status().await {
693 Some(status) if !status.external_addrs.is_empty() => status.external_addrs,
694 _ => match self.network.routable_addr().await {
695 Some(addr) => vec![addr],
696 None => Vec::new(),
697 },
698 };
699
700 let bind_port = self
709 .network
710 .bound_addr()
711 .await
712 .map(|a| a.port())
713 .unwrap_or(5483);
714 if let Ok(sock) = std::net::UdpSocket::bind("[::]:0") {
715 if sock.connect("[2001:4860:4860::8888]:80").is_ok() {
716 if let Ok(local) = sock.local_addr() {
717 if let std::net::IpAddr::V6(v6) = local.ip() {
718 let segs = v6.segments();
719 let is_global = (segs[0] & 0xffc0) != 0xfe80
720 && (segs[0] & 0xff00) != 0xfd00
721 && !v6.is_loopback();
722 if is_global {
723 let v6_addr =
724 std::net::SocketAddr::new(std::net::IpAddr::V6(v6), bind_port);
725 if !addresses.contains(&v6_addr) {
726 addresses.push(v6_addr);
727 }
728 }
729 }
730 }
731 }
732 }
733
734 for addr in collect_local_interface_addrs(bind_port) {
735 if !addresses.contains(&addr) {
736 addresses.push(addr);
737 }
738 }
739
740 addresses.retain(|a| is_publicly_advertisable(*a));
747
748 let (nat_type, can_receive_direct, is_relay, is_coordinator) =
750 match self.network.node_status().await {
751 Some(status) => (
752 Some(status.nat_type.to_string()),
753 Some(status.can_receive_direct),
754 Some(status.is_relaying),
755 Some(status.is_coordinating),
756 ),
757 None => (None, None, None, None),
758 };
759
760 let include_user = self
764 .user_identity_consented
765 .load(std::sync::atomic::Ordering::Acquire);
766 let (user_id, agent_certificate) = if include_user {
767 (
768 self.identity
769 .user_keypair()
770 .map(identity::UserKeypair::user_id),
771 self.identity.agent_certificate().cloned(),
772 )
773 } else {
774 (None, None)
775 };
776
777 let unsigned = IdentityAnnouncementUnsigned {
778 agent_id: self.identity.agent_id(),
779 machine_id: self.identity.machine_id(),
780 user_id,
781 agent_certificate,
782 machine_public_key: machine_public_key.clone(),
783 addresses,
784 announced_at,
785 nat_type: nat_type.clone(),
786 can_receive_direct,
787 is_relay,
788 is_coordinator,
789 };
790 let unsigned_bytes = bincode::serialize(&unsigned).map_err(|e| {
791 error::IdentityError::Serialization(format!(
792 "heartbeat: failed to serialize announcement: {e}"
793 ))
794 })?;
795 let machine_signature = ant_quic::crypto::raw_public_keys::pqc::sign_with_ml_dsa(
796 self.identity.machine_keypair().secret_key(),
797 &unsigned_bytes,
798 )
799 .map_err(|e| {
800 error::IdentityError::Storage(std::io::Error::other(format!(
801 "heartbeat: failed to sign announcement: {:?}",
802 e
803 )))
804 })?
805 .as_bytes()
806 .to_vec();
807
808 let announcement = IdentityAnnouncement {
809 agent_id: unsigned.agent_id,
810 machine_id: unsigned.machine_id,
811 user_id: unsigned.user_id,
812 agent_certificate: unsigned.agent_certificate,
813 machine_public_key: machine_public_key.clone(),
814 machine_signature,
815 addresses: unsigned.addresses,
816 announced_at,
817 nat_type,
818 can_receive_direct,
819 is_relay,
820 is_coordinator,
821 };
822 let encoded = bincode::serialize(&announcement).map_err(|e| {
823 error::IdentityError::Serialization(format!(
824 "heartbeat: failed to serialize announcement: {e}"
825 ))
826 })?;
827 self.runtime
828 .pubsub()
829 .publish(
830 IDENTITY_ANNOUNCE_TOPIC.to_string(),
831 bytes::Bytes::from(encoded),
832 )
833 .await
834 .map_err(|e| {
835 error::IdentityError::Storage(std::io::Error::other(format!(
836 "heartbeat: publish failed: {e}"
837 )))
838 })?;
839 let now = Agent::unix_timestamp_secs();
840 self.cache.write().await.insert(
841 announcement.agent_id,
842 DiscoveredAgent {
843 agent_id: announcement.agent_id,
844 machine_id: announcement.machine_id,
845 user_id: announcement.user_id,
846 addresses: announcement.addresses,
847 announced_at: announcement.announced_at,
848 last_seen: now,
849 machine_public_key: machine_public_key.clone(),
850 nat_type: announcement.nat_type.clone(),
851 can_receive_direct: announcement.can_receive_direct,
852 is_relay: announcement.is_relay,
853 is_coordinator: announcement.is_coordinator,
854 },
855 );
856 Ok(())
857 }
858}
859
860impl Agent {
861 pub async fn new() -> error::Result<Self> {
868 Agent::builder().build().await
869 }
870
871 pub fn builder() -> AgentBuilder {
878 AgentBuilder {
879 machine_key_path: None,
880 agent_keypair: None,
881 agent_key_path: None,
882 agent_cert_path: None,
883 user_keypair: None,
884 user_key_path: None,
885 network_config: None,
886 peer_cache_dir: None,
887 disable_peer_cache: false,
888 heartbeat_interval_secs: None,
889 identity_ttl_secs: None,
890 presence_beacon_interval_secs: None,
891 presence_event_poll_interval_secs: None,
892 presence_offline_timeout_secs: None,
893 contact_store_path: None,
894 }
895 }
896
897 #[inline]
903 #[must_use]
904 pub fn identity(&self) -> &identity::Identity {
905 &self.identity
906 }
907
908 #[inline]
917 #[must_use]
918 pub fn machine_id(&self) -> identity::MachineId {
919 self.identity.machine_id()
920 }
921
922 #[inline]
932 #[must_use]
933 pub fn agent_id(&self) -> identity::AgentId {
934 self.identity.agent_id()
935 }
936
937 #[inline]
942 #[must_use]
943 pub fn user_id(&self) -> Option<identity::UserId> {
944 self.identity.user_id()
945 }
946
947 #[inline]
951 #[must_use]
952 pub fn agent_certificate(&self) -> Option<&identity::AgentCertificate> {
953 self.identity.agent_certificate()
954 }
955
956 #[must_use]
958 pub fn network(&self) -> Option<&std::sync::Arc<network::NetworkNode>> {
959 self.network.as_ref()
960 }
961
962 pub fn gossip_cache_adapter(&self) -> Option<&saorsa_gossip_coordinator::GossipCacheAdapter> {
967 self.gossip_cache_adapter.as_ref()
968 }
969
970 #[must_use]
977 pub fn gossip_stats(&self) -> Option<gossip::PubSubStatsSnapshot> {
978 self.gossip_runtime.as_ref().map(|rt| rt.pubsub().stats())
979 }
980
981 #[must_use]
987 pub fn presence_system(&self) -> Option<&std::sync::Arc<presence::PresenceWrapper>> {
988 self.presence.as_ref()
989 }
990
991 #[must_use]
999 pub fn contacts(&self) -> &std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>> {
1000 &self.contact_store
1001 }
1002
1003 pub async fn reachability(
1009 &self,
1010 agent_id: &identity::AgentId,
1011 ) -> Option<connectivity::ReachabilityInfo> {
1012 let cache = self.identity_discovery_cache.read().await;
1013 cache
1014 .get(agent_id)
1015 .map(connectivity::ReachabilityInfo::from_discovered)
1016 }
1017
1018 async fn seed_transport_peer_hints_for_target(
1019 &self,
1020 network: &network::NetworkNode,
1021 target: &DiscoveredAgent,
1022 ) -> error::Result<()> {
1023 fn merge_helper_hint(
1024 hints: &mut std::collections::HashMap<
1025 ant_quic::PeerId,
1026 (
1027 Vec<std::net::SocketAddr>,
1028 ant_quic::bootstrap_cache::PeerCapabilities,
1029 ),
1030 >,
1031 peer_id: ant_quic::PeerId,
1032 addrs: impl IntoIterator<Item = std::net::SocketAddr>,
1033 supports_coordination: bool,
1034 supports_relay: bool,
1035 ) {
1036 let entry = hints.entry(peer_id).or_insert_with(|| {
1037 (
1038 Vec::new(),
1039 ant_quic::bootstrap_cache::PeerCapabilities::default(),
1040 )
1041 });
1042 for addr in addrs {
1043 if !entry.0.contains(&addr) {
1044 entry.0.push(addr);
1045 }
1046 }
1047 if supports_coordination {
1048 entry.1.supports_coordination = true;
1049 }
1050 if supports_relay {
1051 entry.1.supports_relay = true;
1052 }
1053 }
1054
1055 let target_peer_id = ant_quic::PeerId(target.machine_id.0);
1056 if target.machine_id.0 != [0u8; 32] {
1057 network
1058 .upsert_peer_hints(target_peer_id, target.addresses.clone(), None)
1059 .await
1060 .map_err(|e| {
1061 error::IdentityError::Storage(std::io::Error::other(format!(
1062 "failed to upsert target peer hints: {e}"
1063 )))
1064 })?;
1065 }
1066
1067 let mut helper_hints: std::collections::HashMap<
1068 ant_quic::PeerId,
1069 (
1070 Vec<std::net::SocketAddr>,
1071 ant_quic::bootstrap_cache::PeerCapabilities,
1072 ),
1073 > = std::collections::HashMap::new();
1074
1075 if let Some(ref cache) = self.bootstrap_cache {
1076 for peer in cache.select_coordinators(6).await {
1077 merge_helper_hint(
1078 &mut helper_hints,
1079 peer.peer_id,
1080 peer.preferred_addresses(),
1081 true,
1082 false,
1083 );
1084 }
1085 for peer in cache.select_relay_peers(6).await {
1086 merge_helper_hint(
1087 &mut helper_hints,
1088 peer.peer_id,
1089 peer.preferred_addresses(),
1090 false,
1091 true,
1092 );
1093 }
1094 }
1095
1096 if let Some(ref adapter) = self.gossip_cache_adapter {
1097 let mut adverts = adapter.get_all_adverts();
1098 adverts.sort_by_key(|a| std::cmp::Reverse(a.score));
1099 for advert in adverts.into_iter().take(12) {
1100 let advert_peer_id = ant_quic::PeerId(*advert.peer.as_bytes());
1101 if advert_peer_id == target_peer_id {
1102 continue;
1103 }
1104 merge_helper_hint(
1105 &mut helper_hints,
1106 advert_peer_id,
1107 advert
1108 .addr_hints
1109 .into_iter()
1110 .map(|hint| hint.addr)
1111 .filter(|addr| is_publicly_advertisable(*addr)),
1112 advert.roles.coordinator || advert.roles.rendezvous,
1113 advert.roles.relay,
1114 );
1115 }
1116 }
1117
1118 let discovered: Vec<DiscoveredAgent> = {
1119 let cache = self.identity_discovery_cache.read().await;
1120 cache.values().cloned().collect()
1121 };
1122 for candidate in discovered {
1123 if candidate.agent_id == target.agent_id
1124 || candidate.machine_id == target.machine_id
1125 || candidate.machine_id.0 == [0u8; 32]
1126 {
1127 continue;
1128 }
1129 merge_helper_hint(
1130 &mut helper_hints,
1131 ant_quic::PeerId(candidate.machine_id.0),
1132 candidate.addresses.iter().copied(),
1133 candidate.is_coordinator == Some(true),
1134 candidate.is_relay == Some(true),
1135 );
1136 }
1137
1138 for (peer_id, (mut addrs, caps)) in helper_hints {
1139 addrs.retain(|addr| !target.addresses.contains(addr));
1140 if addrs.is_empty() && !caps.supports_coordination && !caps.supports_relay {
1141 continue;
1142 }
1143 network
1144 .upsert_peer_hints(peer_id, addrs, Some(caps))
1145 .await
1146 .map_err(|e| {
1147 error::IdentityError::Storage(std::io::Error::other(format!(
1148 "failed to upsert helper peer hints: {e}"
1149 )))
1150 })?;
1151 }
1152
1153 Ok(())
1154 }
1155
1156 pub async fn connect_to_agent(
1174 &self,
1175 agent_id: &identity::AgentId,
1176 ) -> error::Result<connectivity::ConnectOutcome> {
1177 let call_start = std::time::Instant::now();
1178 let agent_prefix = network::hex_prefix(&agent_id.0, 4);
1179 tracing::debug!(
1180 target: "x0x::connect",
1181 stage = "connect_to_agent",
1182 %agent_prefix,
1183 "begin"
1184 );
1185 let discovered = {
1187 let cache = self.identity_discovery_cache.read().await;
1188 cache.get(agent_id).cloned()
1189 };
1190
1191 let agent = match discovered {
1192 Some(a) => a,
1193 None => {
1194 tracing::info!(
1195 target: "x0x::connect",
1196 stage = "connect_to_agent",
1197 %agent_prefix,
1198 outcome = "not_found",
1199 dur_ms = call_start.elapsed().as_millis() as u64,
1200 "agent not in discovery cache"
1201 );
1202 return Ok(connectivity::ConnectOutcome::NotFound);
1203 }
1204 };
1205
1206 let info = connectivity::ReachabilityInfo::from_discovered(&agent);
1207 let v4_addrs = info.addresses.iter().filter(|a| a.is_ipv4()).count();
1208 let v6_addrs = info.addresses.len() - v4_addrs;
1209 tracing::info!(
1210 target: "x0x::connect",
1211 stage = "connect_to_agent",
1212 %agent_prefix,
1213 machine_prefix = %network::hex_prefix(&agent.machine_id.0, 4),
1214 addr_total = info.addresses.len(),
1215 v4_addrs,
1216 v6_addrs,
1217 can_receive_direct = ?info.can_receive_direct,
1218 should_attempt_direct = info.should_attempt_direct(),
1219 needs_coordination = info.needs_coordination(),
1220 "reachability classified"
1221 );
1222
1223 let Some(ref network) = self.network else {
1224 tracing::warn!(
1225 target: "x0x::connect",
1226 stage = "connect_to_agent",
1227 %agent_prefix,
1228 outcome = "unreachable_no_network",
1229 "network layer not initialised"
1230 );
1231 return Ok(connectivity::ConnectOutcome::Unreachable);
1232 };
1233
1234 let connected_machine_id = if agent.machine_id.0 != [0u8; 32]
1240 && network
1241 .is_connected(&ant_quic::PeerId(agent.machine_id.0))
1242 .await
1243 {
1244 Some(agent.machine_id)
1245 } else {
1246 match self.direct_messaging.get_machine_id(agent_id).await {
1247 Some(machine_id) if network.is_connected(&ant_quic::PeerId(machine_id.0)).await => {
1248 Some(machine_id)
1249 }
1250 _ => None,
1251 }
1252 };
1253 if let Some(machine_id) = connected_machine_id {
1254 if machine_id != agent.machine_id {
1255 let mut cache = self.identity_discovery_cache.write().await;
1256 if let Some(entry) = cache.get_mut(agent_id) {
1257 entry.machine_id = machine_id;
1258 }
1259 }
1260 self.direct_messaging
1261 .mark_connected(agent.agent_id, machine_id)
1262 .await;
1263 let dur_ms = call_start.elapsed().as_millis() as u64;
1264 return if let Some(addr) = info.addresses.first() {
1265 let family = if addr.is_ipv4() { "v4" } else { "v6" };
1266 tracing::info!(
1267 target: "x0x::connect",
1268 stage = "connect_to_agent",
1269 %agent_prefix,
1270 strategy = "already_connected",
1271 outcome = "direct",
1272 selected_addr = %addr,
1273 family,
1274 dur_ms,
1275 "reusing existing connection"
1276 );
1277 Ok(connectivity::ConnectOutcome::Direct(*addr))
1278 } else {
1279 tracing::info!(
1280 target: "x0x::connect",
1281 stage = "connect_to_agent",
1282 %agent_prefix,
1283 strategy = "already_connected",
1284 outcome = "already_connected",
1285 dur_ms,
1286 "reusing existing connection without known addr"
1287 );
1288 Ok(connectivity::ConnectOutcome::AlreadyConnected)
1289 };
1290 }
1291
1292 if info.addresses.is_empty() {
1293 tracing::info!(
1294 target: "x0x::connect",
1295 stage = "connect_to_agent",
1296 %agent_prefix,
1297 outcome = "unreachable",
1298 reason = "no_addresses",
1299 dur_ms = call_start.elapsed().as_millis() as u64,
1300 "no known addresses for agent"
1301 );
1302 return Ok(connectivity::ConnectOutcome::Unreachable);
1303 }
1304
1305 let dial_timeout = std::time::Duration::from_secs(8);
1306
1307 if agent.machine_id.0 != [0u8; 32] {
1311 let peer_id_hint = ant_quic::PeerId(agent.machine_id.0);
1312 self.seed_transport_peer_hints_for_target(network, &agent)
1313 .await
1314 .map_err(|e| {
1315 error::IdentityError::Storage(std::io::Error::other(format!(
1316 "failed to seed transport peer hints: {e}"
1317 )))
1318 })?;
1319
1320 match tokio::time::timeout(
1321 dial_timeout,
1322 network.connect_peer_with_addrs(peer_id_hint, info.addresses.clone()),
1323 )
1324 .await
1325 {
1326 Ok(Ok((addr, verified_peer_id))) => {
1327 let verified_machine_id = identity::MachineId(verified_peer_id.0);
1328 if let Some(ref bc) = self.bootstrap_cache {
1329 bc.add_from_connection(verified_peer_id, vec![addr], None)
1330 .await;
1331 bc.record_success(&verified_peer_id, 0).await;
1332 }
1333 {
1334 let mut cache = self.identity_discovery_cache.write().await;
1335 if let Some(entry) = cache.get_mut(agent_id) {
1336 entry.machine_id = verified_machine_id;
1337 }
1338 }
1339 self.direct_messaging
1340 .mark_connected(agent.agent_id, verified_machine_id)
1341 .await;
1342 let family = if addr.is_ipv4() { "v4" } else { "v6" };
1343 tracing::info!(
1344 target: "x0x::connect",
1345 stage = "connect_to_agent",
1346 %agent_prefix,
1347 strategy = "hinted_peer",
1348 outcome = "coordinated",
1349 selected_addr = %addr,
1350 family,
1351 dur_ms = call_start.elapsed().as_millis() as u64,
1352 "hinted peer dial succeeded"
1353 );
1354 return Ok(connectivity::ConnectOutcome::Coordinated(addr));
1355 }
1356 Ok(Err(e)) => {
1357 tracing::debug!(
1358 target: "x0x::connect",
1359 %agent_prefix,
1360 strategy = "hinted_peer",
1361 error = %e,
1362 "hinted peer dial failed"
1363 );
1364 }
1365 Err(_) => {
1366 tracing::debug!(
1367 target: "x0x::connect",
1368 %agent_prefix,
1369 strategy = "hinted_peer",
1370 timeout_s = dial_timeout.as_secs(),
1371 "hinted peer dial timed out"
1372 );
1373 }
1374 }
1375 }
1376
1377 if info.should_attempt_direct() {
1381 for addr in &info.addresses {
1382 match tokio::time::timeout(dial_timeout, network.connect_addr(*addr)).await {
1383 Ok(Ok(connected_peer_id)) => {
1384 let real_machine_id = identity::MachineId(connected_peer_id.0);
1387 if let Some(ref bc) = self.bootstrap_cache {
1389 bc.add_from_connection(connected_peer_id, vec![*addr], None)
1390 .await;
1391 }
1392 {
1394 let mut cache = self.identity_discovery_cache.write().await;
1395 if let Some(entry) = cache.get_mut(agent_id) {
1396 entry.machine_id = real_machine_id;
1397 }
1398 }
1399 self.direct_messaging
1401 .mark_connected(agent.agent_id, real_machine_id)
1402 .await;
1403 let family = if addr.is_ipv4() { "v4" } else { "v6" };
1404 tracing::info!(
1405 target: "x0x::connect",
1406 stage = "connect_to_agent",
1407 %agent_prefix,
1408 strategy = "direct_per_addr",
1409 outcome = "direct",
1410 selected_addr = %addr,
1411 family,
1412 dur_ms = call_start.elapsed().as_millis() as u64,
1413 "direct dial succeeded"
1414 );
1415 return Ok(connectivity::ConnectOutcome::Direct(*addr));
1416 }
1417 Ok(Err(e)) => {
1418 tracing::debug!(
1419 target: "x0x::connect",
1420 %agent_prefix,
1421 strategy = "direct_per_addr",
1422 %addr,
1423 error = %e,
1424 "direct dial failed"
1425 );
1426 }
1427 Err(_) => {
1428 tracing::debug!(
1429 target: "x0x::connect",
1430 %agent_prefix,
1431 strategy = "direct_per_addr",
1432 %addr,
1433 timeout_s = dial_timeout.as_secs(),
1434 "direct dial timed out"
1435 );
1436 }
1437 }
1438 }
1439 }
1440
1441 if info.needs_coordination() || !info.should_attempt_direct() {
1446 let peer_id_hint = ant_quic::PeerId(agent.machine_id.0);
1450 let hint_was_zeroed = agent.machine_id.0 == [0u8; 32];
1451 self.seed_transport_peer_hints_for_target(network, &agent)
1452 .await
1453 .map_err(|e| {
1454 error::IdentityError::Storage(std::io::Error::other(format!(
1455 "failed to seed transport peer hints: {e}"
1456 )))
1457 })?;
1458 let coordinated_result = tokio::time::timeout(
1459 dial_timeout,
1460 network.connect_peer_with_addrs(peer_id_hint, info.addresses.clone()),
1461 )
1462 .await;
1463 match coordinated_result {
1464 Ok(Ok((addr, verified_peer_id))) => {
1465 let verified_machine_id = identity::MachineId(verified_peer_id.0);
1466
1467 if !hint_was_zeroed {
1473 if let Some(ref bc) = self.bootstrap_cache {
1474 bc.add_from_connection(verified_peer_id, vec![addr], None)
1475 .await;
1476 bc.record_success(&verified_peer_id, 0).await;
1477 }
1478 {
1479 let mut cache = self.identity_discovery_cache.write().await;
1480 if let Some(entry) = cache.get_mut(agent_id) {
1481 entry.machine_id = verified_machine_id;
1482 }
1483 }
1484 }
1485
1486 if !hint_was_zeroed {
1493 self.direct_messaging
1494 .mark_connected(agent.agent_id, verified_machine_id)
1495 .await;
1496 }
1497 let family = if addr.is_ipv4() { "v4" } else { "v6" };
1498 tracing::info!(
1499 target: "x0x::connect",
1500 stage = "connect_to_agent",
1501 %agent_prefix,
1502 strategy = "coordinated_fallback",
1503 outcome = "coordinated",
1504 selected_addr = %addr,
1505 family,
1506 hint_was_zeroed,
1507 dur_ms = call_start.elapsed().as_millis() as u64,
1508 "coordinated dial succeeded"
1509 );
1510 return Ok(connectivity::ConnectOutcome::Coordinated(addr));
1511 }
1512 Ok(Err(e)) => {
1513 tracing::debug!(
1514 target: "x0x::connect",
1515 %agent_prefix,
1516 strategy = "coordinated_fallback",
1517 error = %e,
1518 "coordinated dial failed"
1519 );
1520 }
1521 Err(_) => {
1522 tracing::debug!(
1523 target: "x0x::connect",
1524 %agent_prefix,
1525 strategy = "coordinated_fallback",
1526 timeout_s = dial_timeout.as_secs(),
1527 "coordinated dial timed out"
1528 );
1529 }
1530 }
1531 }
1532
1533 tracing::warn!(
1534 target: "x0x::connect",
1535 stage = "connect_to_agent",
1536 %agent_prefix,
1537 outcome = "unreachable",
1538 reason = "all_strategies_exhausted",
1539 dur_ms = call_start.elapsed().as_millis() as u64,
1540 v4_addrs,
1541 v6_addrs,
1542 "all connection strategies exhausted"
1543 );
1544 Ok(connectivity::ConnectOutcome::Unreachable)
1545 }
1546
1547 pub async fn shutdown(&self) {
1553 if let Some(ref pw) = self.presence {
1555 pw.shutdown().await;
1556 tracing::info!("Presence system shut down");
1557 }
1558
1559 if let Some(ref cache) = self.bootstrap_cache {
1560 if let Err(e) = cache.save().await {
1561 tracing::warn!("Failed to save bootstrap cache on shutdown: {e}");
1562 } else {
1563 tracing::info!("Bootstrap cache saved on shutdown");
1564 }
1565 }
1566 }
1567
1568 pub async fn send_direct(
1610 &self,
1611 to: &identity::AgentId,
1612 payload: Vec<u8>,
1613 ) -> Result<dm::DmReceipt, dm::DmError> {
1614 self.send_direct_with_config(to, payload, dm::DmSendConfig::default())
1615 .await
1616 }
1617
1618 pub async fn send_direct_with_config(
1624 &self,
1625 to: &identity::AgentId,
1626 payload: Vec<u8>,
1627 config: dm::DmSendConfig,
1628 ) -> Result<dm::DmReceipt, dm::DmError> {
1629 let cap = self.capability_store.lookup(to);
1630 let gossip_ok = cap
1631 .as_ref()
1632 .map(|c| c.gossip_inbox && !c.kem_public_key.is_empty())
1633 .unwrap_or(false);
1634
1635 if gossip_ok {
1636 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1637 dm::DmError::LocalGossipUnavailable(
1638 "send_direct: no gossip runtime configured".to_string(),
1639 )
1640 })?;
1641 let signing = gossip::SigningContext::from_keypair(self.identity.agent_keypair());
1642 let kem_pub = cap
1643 .as_ref()
1644 .map(|c| c.kem_public_key.clone())
1645 .unwrap_or_default();
1646 return dm_send::send_via_gossip(
1647 std::sync::Arc::clone(runtime.pubsub()),
1648 &signing,
1649 self.identity.agent_id(),
1650 self.identity.machine_id(),
1651 *to,
1652 &kem_pub,
1653 payload,
1654 &config,
1655 std::sync::Arc::clone(&self.dm_inflight_acks),
1656 )
1657 .await;
1658 }
1659
1660 if config.require_gossip {
1661 return Err(dm::DmError::RecipientKeyUnavailable(format!(
1662 "recipient {} has no gossip DM capability advert",
1663 hex::encode(to.as_bytes())
1664 )));
1665 }
1666
1667 self.send_direct_raw_quic(to, payload)
1668 .await
1669 .map(|_| dm_send::raw_quic_receipt())
1670 .map_err(|e| match e {
1671 error::NetworkError::AgentNotFound(_)
1672 | error::NetworkError::AgentNotConnected(_) => {
1673 dm::DmError::RecipientKeyUnavailable(e.to_string())
1674 }
1675 other => dm::DmError::PublishFailed(other.to_string()),
1676 })
1677 }
1678
1679 async fn send_direct_raw_quic(
1681 &self,
1682 agent_id: &identity::AgentId,
1683 payload: Vec<u8>,
1684 ) -> error::NetworkResult<()> {
1685 let send_start = std::time::Instant::now();
1686 let agent_prefix = network::hex_prefix(&agent_id.0, 4);
1687 let self_prefix = network::hex_prefix(&self.identity.agent_id().0, 4);
1688 let bytes = payload.len();
1689
1690 let network = self.network.as_ref().ok_or_else(|| {
1691 tracing::warn!(
1692 target: "x0x::direct",
1693 stage = "send",
1694 %agent_prefix,
1695 outcome = "err_no_network",
1696 "network not initialised"
1697 );
1698 error::NetworkError::NodeCreation("network not initialized".to_string())
1699 })?;
1700
1701 let cached_machine_id = {
1706 let cache = self.identity_discovery_cache.read().await;
1707 cache
1708 .get(agent_id)
1709 .map(|d| d.machine_id)
1710 .filter(|m| m.0 != [0u8; 32]) };
1712 let registry_machine_id = self.direct_messaging.get_machine_id(agent_id).await;
1713
1714 let (machine_id, resolution) = match (cached_machine_id, registry_machine_id) {
1715 (Some(id), _) if network.is_connected(&ant_quic::PeerId(id.0)).await => {
1716 (id, "cached_connected")
1717 }
1718 (_, Some(id)) if network.is_connected(&ant_quic::PeerId(id.0)).await => {
1719 if cached_machine_id != Some(id) {
1720 let mut cache = self.identity_discovery_cache.write().await;
1721 if let Some(entry) = cache.get_mut(agent_id) {
1722 entry.machine_id = id;
1723 }
1724 }
1725 (id, "registry_connected")
1726 }
1727 (Some(id), None) => (id, "cached_not_connected"),
1728 (Some(id), Some(_)) => (id, "cached_both_disconnected"),
1729 (None, Some(id)) => (id, "registry_not_connected"),
1730 (None, None) => {
1731 tracing::debug!(
1732 target: "x0x::direct",
1733 stage = "send",
1734 %agent_prefix,
1735 resolution = "last_resort_connect",
1736 "no machine_id known; triggering connect_to_agent"
1737 );
1738 let _ = self.connect_to_agent(agent_id).await;
1739 let id = self
1740 .direct_messaging
1741 .get_machine_id(agent_id)
1742 .await
1743 .ok_or_else(|| {
1744 tracing::warn!(
1745 target: "x0x::direct",
1746 stage = "send",
1747 %agent_prefix,
1748 outcome = "err_agent_not_found",
1749 dur_ms = send_start.elapsed().as_millis() as u64,
1750 "no machine_id after connect_to_agent"
1751 );
1752 error::NetworkError::AgentNotFound(agent_id.0)
1753 })?;
1754 (id, "post_connect")
1755 }
1756 };
1757
1758 let ant_peer_id = ant_quic::PeerId(machine_id.0);
1760 let machine_prefix = network::hex_prefix(&machine_id.0, 4);
1761 if !network.is_connected(&ant_peer_id).await {
1762 tracing::warn!(
1763 target: "x0x::direct",
1764 stage = "send",
1765 %agent_prefix,
1766 %machine_prefix,
1767 resolution,
1768 outcome = "err_not_connected",
1769 bytes,
1770 dur_ms = send_start.elapsed().as_millis() as u64,
1771 "machine_id resolved but peer not currently connected"
1772 );
1773 return Err(error::NetworkError::AgentNotConnected(agent_id.0));
1774 }
1775
1776 match network
1778 .send_direct(&ant_peer_id, &self.identity.agent_id().0, &payload)
1779 .await
1780 {
1781 Ok(()) => {
1782 tracing::info!(
1783 target: "x0x::direct",
1784 stage = "send",
1785 from = %self_prefix,
1786 to = %agent_prefix,
1787 %machine_prefix,
1788 resolution,
1789 bytes,
1790 dur_ms = send_start.elapsed().as_millis() as u64,
1791 outcome = "ok",
1792 "direct message sent"
1793 );
1794 Ok(())
1795 }
1796 Err(e) => {
1797 tracing::warn!(
1798 target: "x0x::direct",
1799 stage = "send",
1800 from = %self_prefix,
1801 to = %agent_prefix,
1802 %machine_prefix,
1803 resolution,
1804 bytes,
1805 dur_ms = send_start.elapsed().as_millis() as u64,
1806 outcome = "err_transport",
1807 error = %e,
1808 "transport send_direct failed"
1809 );
1810 Err(e)
1811 }
1812 }
1813 }
1814
1815 pub async fn recv_direct(&self) -> Option<direct::DirectMessage> {
1840 self.recv_direct_inner().await
1841 }
1842
1843 pub async fn recv_direct_annotated(&self) -> Option<direct::DirectMessage> {
1869 self.recv_direct_inner().await
1870 }
1871
1872 async fn recv_direct_inner(&self) -> Option<direct::DirectMessage> {
1879 self.direct_messaging.recv().await
1880 }
1881
1882 pub fn subscribe_direct(&self) -> direct::DirectMessageReceiver {
1898 self.direct_messaging.subscribe()
1899 }
1900
1901 pub fn direct_messaging(&self) -> &std::sync::Arc<direct::DirectMessaging> {
1905 &self.direct_messaging
1906 }
1907
1908 pub async fn is_agent_connected(&self, agent_id: &identity::AgentId) -> bool {
1918 let Some(network) = &self.network else {
1919 return false;
1920 };
1921
1922 let machine_id = {
1924 let cache = self.identity_discovery_cache.read().await;
1925 cache.get(agent_id).map(|d| d.machine_id)
1926 };
1927
1928 match machine_id {
1929 Some(mid) => {
1930 let ant_peer_id = ant_quic::PeerId(mid.0);
1931 network.is_connected(&ant_peer_id).await
1932 }
1933 None => false,
1934 }
1935 }
1936
1937 pub async fn connected_agents(&self) -> Vec<identity::AgentId> {
1942 let Some(network) = &self.network else {
1943 return Vec::new();
1944 };
1945
1946 let connected_peers = network.connected_peers().await;
1947 let cache = self.identity_discovery_cache.read().await;
1948
1949 cache
1951 .values()
1952 .filter(|agent| {
1953 let ant_peer_id = ant_quic::PeerId(agent.machine_id.0);
1954 connected_peers.contains(&ant_peer_id)
1955 })
1956 .map(|agent| agent.agent_id)
1957 .collect()
1958 }
1959
1960 pub fn set_contacts(&self, store: std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>>) {
1968 if let Some(runtime) = &self.gossip_runtime {
1969 runtime.pubsub().set_contacts(store);
1970 }
1971 }
1972
1973 pub async fn announce_identity(
1991 &self,
1992 include_user_identity: bool,
1993 human_consent: bool,
1994 ) -> error::Result<()> {
1995 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1996 error::IdentityError::Storage(std::io::Error::other(
1997 "gossip runtime not initialized - configure agent with network first",
1998 ))
1999 })?;
2000
2001 self.start_identity_listener().await?;
2002
2003 let mut addresses = if let Some(network) = self.network.as_ref() {
2005 match network.node_status().await {
2006 Some(status) if !status.external_addrs.is_empty() => status.external_addrs,
2007 _ => match network.routable_addr().await {
2008 Some(addr) => vec![addr],
2009 None => self.announcement_addresses(),
2010 },
2011 }
2012 } else {
2013 self.announcement_addresses()
2014 };
2015 let bind_port = if let Some(network) = self.network.as_ref() {
2024 network.bound_addr().await.map(|a| a.port()).unwrap_or(5483)
2025 } else {
2026 5483
2027 };
2028
2029 if let Ok(sock) = std::net::UdpSocket::bind("[::]:0") {
2031 if sock.connect("[2001:4860:4860::8888]:80").is_ok() {
2032 if let Ok(local) = sock.local_addr() {
2033 if let std::net::IpAddr::V6(v6) = local.ip() {
2034 let segs = v6.segments();
2035 let is_global = (segs[0] & 0xffc0) != 0xfe80
2036 && (segs[0] & 0xff00) != 0xfd00
2037 && !v6.is_loopback();
2038 if is_global {
2039 let v6_addr =
2040 std::net::SocketAddr::new(std::net::IpAddr::V6(v6), bind_port);
2041 if !addresses.contains(&v6_addr) {
2042 addresses.push(v6_addr);
2043 }
2044 }
2045 }
2046 }
2047 }
2048 }
2049
2050 for addr in collect_local_interface_addrs(bind_port) {
2051 if !addresses.contains(&addr) {
2052 addresses.push(addr);
2053 }
2054 }
2055
2056 addresses.retain(|a| is_publicly_advertisable(*a));
2060
2061 let announcement = self.build_identity_announcement_with_addrs(
2062 include_user_identity,
2063 human_consent,
2064 addresses,
2065 )?;
2066
2067 let encoded = bincode::serialize(&announcement).map_err(|e| {
2068 error::IdentityError::Serialization(format!(
2069 "failed to serialize identity announcement: {e}"
2070 ))
2071 })?;
2072
2073 let payload = bytes::Bytes::from(encoded);
2074
2075 let shard_topic = shard_topic_for_agent(&announcement.agent_id);
2077 runtime
2078 .pubsub()
2079 .publish(shard_topic, payload.clone())
2080 .await
2081 .map_err(|e| {
2082 error::IdentityError::Storage(std::io::Error::other(format!(
2083 "failed to publish identity announcement to shard topic: {e}"
2084 )))
2085 })?;
2086
2087 runtime
2089 .pubsub()
2090 .publish(IDENTITY_ANNOUNCE_TOPIC.to_string(), payload)
2091 .await
2092 .map_err(|e| {
2093 error::IdentityError::Storage(std::io::Error::other(format!(
2094 "failed to publish identity announcement: {e}"
2095 )))
2096 })?;
2097
2098 let now = Self::unix_timestamp_secs();
2099 self.identity_discovery_cache.write().await.insert(
2100 announcement.agent_id,
2101 DiscoveredAgent {
2102 agent_id: announcement.agent_id,
2103 machine_id: announcement.machine_id,
2104 user_id: announcement.user_id,
2105 addresses: announcement.addresses.clone(),
2106 announced_at: announcement.announced_at,
2107 last_seen: now,
2108 machine_public_key: announcement.machine_public_key.clone(),
2109 nat_type: announcement.nat_type.clone(),
2110 can_receive_direct: announcement.can_receive_direct,
2111 is_relay: announcement.is_relay,
2112 is_coordinator: announcement.is_coordinator,
2113 },
2114 );
2115
2116 if include_user_identity && human_consent {
2119 self.user_identity_consented
2120 .store(true, std::sync::atomic::Ordering::Release);
2121 }
2122
2123 Ok(())
2124 }
2125
2126 pub async fn discovered_agents(&self) -> error::Result<Vec<DiscoveredAgent>> {
2132 self.start_identity_listener().await?;
2133 let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
2134 let mut agents: Vec<_> = self
2135 .identity_discovery_cache
2136 .read()
2137 .await
2138 .values()
2139 .filter(|a| a.announced_at >= cutoff)
2140 .cloned()
2141 .collect();
2142 agents.sort_by_key(|a| a.agent_id.0);
2143 Ok(agents)
2144 }
2145
2146 pub async fn discovered_agents_unfiltered(&self) -> error::Result<Vec<DiscoveredAgent>> {
2155 self.start_identity_listener().await?;
2156 let mut agents: Vec<_> = self
2157 .identity_discovery_cache
2158 .read()
2159 .await
2160 .values()
2161 .cloned()
2162 .collect();
2163 agents.sort_by_key(|a| a.agent_id.0);
2164 Ok(agents)
2165 }
2166
2167 pub async fn discovered_agent(
2173 &self,
2174 agent_id: identity::AgentId,
2175 ) -> error::Result<Option<DiscoveredAgent>> {
2176 self.start_identity_listener().await?;
2177 Ok(self
2178 .identity_discovery_cache
2179 .read()
2180 .await
2181 .get(&agent_id)
2182 .cloned())
2183 }
2184
2185 async fn start_identity_listener(&self) -> error::Result<()> {
2186 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2187 error::IdentityError::Storage(std::io::Error::other(
2188 "gossip runtime not initialized - configure agent with network first",
2189 ))
2190 })?;
2191
2192 if self
2193 .identity_listener_started
2194 .swap(true, std::sync::atomic::Ordering::AcqRel)
2195 {
2196 return Ok(());
2197 }
2198
2199 let mut sub_legacy = runtime
2200 .pubsub()
2201 .subscribe(IDENTITY_ANNOUNCE_TOPIC.to_string())
2202 .await;
2203 let own_shard_topic = shard_topic_for_agent(&self.agent_id());
2204 let mut sub_shard = runtime.pubsub().subscribe(own_shard_topic).await;
2205 let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
2206 let bootstrap_cache = self.bootstrap_cache.clone();
2207 let contact_store = std::sync::Arc::clone(&self.contact_store);
2208 let direct_messaging = std::sync::Arc::clone(&self.direct_messaging);
2209 let network = self.network.as_ref().map(std::sync::Arc::clone);
2210 let own_agent_id = self.agent_id();
2211 let rebroadcast_pubsub = std::sync::Arc::clone(runtime.pubsub());
2212
2213 tokio::spawn(async move {
2214 let mut auto_connect_attempted = std::collections::HashSet::<identity::AgentId>::new();
2217
2218 let mut rebroadcast_state: std::collections::HashMap<
2224 (identity::AgentId, u64),
2225 std::time::Instant,
2226 > = std::collections::HashMap::new();
2227 const REBROADCAST_MIN_INTERVAL: std::time::Duration =
2228 std::time::Duration::from_secs(20);
2229
2230 loop {
2231 let msg = tokio::select! {
2233 Some(m) = sub_legacy.recv() => m,
2234 Some(m) = sub_shard.recv() => m,
2235 else => break,
2236 };
2237 let decoded = {
2238 use bincode::Options;
2239 bincode::options()
2240 .with_fixint_encoding()
2241 .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
2242 .allow_trailing_bytes()
2243 .deserialize::<IdentityAnnouncement>(&msg.payload)
2244 };
2245 let raw_payload = msg.payload.clone();
2246 let announcement = match decoded {
2247 Ok(a) => a,
2248 Err(e) => {
2249 tracing::debug!("Ignoring invalid identity announcement payload: {}", e);
2250 continue;
2251 }
2252 };
2253
2254 if let Err(e) = announcement.verify() {
2255 tracing::warn!("Ignoring unverifiable identity announcement: {}", e);
2256 continue;
2257 }
2258
2259 {
2262 let store = contact_store.read().await;
2263 let evaluator = trust::TrustEvaluator::new(&store);
2264 let decision = evaluator.evaluate(&trust::TrustContext {
2265 agent_id: &announcement.agent_id,
2266 machine_id: &announcement.machine_id,
2267 });
2268 match decision {
2269 trust::TrustDecision::RejectBlocked => {
2270 tracing::debug!(
2271 "Dropping identity announcement from blocked agent {:?}",
2272 hex::encode(&announcement.agent_id.0[..8]),
2273 );
2274 continue;
2275 }
2276 trust::TrustDecision::RejectMachineMismatch => {
2277 tracing::warn!(
2278 "Dropping identity announcement from agent {:?}: machine {:?} not in pinned list",
2279 hex::encode(&announcement.agent_id.0[..8]),
2280 hex::encode(&announcement.machine_id.0[..8]),
2281 );
2282 continue;
2283 }
2284 _ => {}
2285 }
2286 }
2287
2288 {
2290 let mut store = contact_store.write().await;
2291 let record = contacts::MachineRecord::new(announcement.machine_id, None);
2292 store.add_machine(&announcement.agent_id, record);
2293 }
2294
2295 let now = std::time::SystemTime::now()
2296 .duration_since(std::time::UNIX_EPOCH)
2297 .map_or(0, |d| d.as_secs());
2298
2299 {
2305 let public_addrs: Vec<std::net::SocketAddr> = announcement
2306 .addresses
2307 .iter()
2308 .copied()
2309 .filter(|a| is_globally_routable(a.ip()))
2310 .collect();
2311 if !public_addrs.is_empty() {
2312 if let Some(ref bc) = &bootstrap_cache {
2313 let peer_id = ant_quic::PeerId(announcement.machine_id.0);
2314 bc.add_from_connection(peer_id, public_addrs.clone(), None)
2315 .await;
2316 tracing::debug!(
2317 "Added {} public addresses to bootstrap cache for agent {:?} (machine {:?})",
2318 public_addrs.len(),
2319 announcement.agent_id,
2320 hex::encode(&announcement.machine_id.0[..8]),
2321 );
2322 }
2323 }
2324 }
2325
2326 let filtered_addresses: Vec<std::net::SocketAddr> = announcement
2335 .addresses
2336 .iter()
2337 .copied()
2338 .filter(|a| is_publicly_advertisable(*a))
2339 .collect();
2340 cache.write().await.insert(
2341 announcement.agent_id,
2342 DiscoveredAgent {
2343 agent_id: announcement.agent_id,
2344 machine_id: announcement.machine_id,
2345 user_id: announcement.user_id,
2346 addresses: filtered_addresses,
2347 announced_at: announcement.announced_at,
2348 last_seen: now,
2349 machine_public_key: announcement.machine_public_key.clone(),
2350 nat_type: announcement.nat_type.clone(),
2351 can_receive_direct: announcement.can_receive_direct,
2352 is_relay: announcement.is_relay,
2353 is_coordinator: announcement.is_coordinator,
2354 },
2355 );
2356
2357 direct_messaging
2361 .register_agent(announcement.agent_id, announcement.machine_id)
2362 .await;
2363
2364 if announcement.agent_id != own_agent_id {
2374 let key = (announcement.agent_id, announcement.announced_at);
2375 let should_forward = match rebroadcast_state.get(&key) {
2376 None => true,
2377 Some(last) => last.elapsed() >= REBROADCAST_MIN_INTERVAL,
2378 };
2379 if should_forward {
2380 rebroadcast_state.insert(key, std::time::Instant::now());
2381 if rebroadcast_state.len() > 1024 {
2383 let cutoff =
2384 std::time::Instant::now() - std::time::Duration::from_secs(3600);
2385 rebroadcast_state.retain(|_, t| *t >= cutoff);
2386 }
2387 let pubsub = std::sync::Arc::clone(&rebroadcast_pubsub);
2388 let payload = raw_payload.clone();
2389 tokio::spawn(async move {
2390 if let Err(e) = pubsub
2391 .publish(IDENTITY_ANNOUNCE_TOPIC.to_string(), payload)
2392 .await
2393 {
2394 tracing::debug!("identity announcement re-broadcast failed: {e}");
2395 }
2396 });
2397 }
2398 }
2399
2400 if let Some(ref net) = &network {
2404 let ant_peer_id = ant_quic::PeerId(announcement.machine_id.0);
2405 if net.is_connected(&ant_peer_id).await {
2406 direct_messaging
2407 .mark_connected(announcement.agent_id, announcement.machine_id)
2408 .await;
2409 }
2410 }
2411
2412 if announcement.agent_id != own_agent_id
2417 && !announcement.addresses.is_empty()
2418 && !auto_connect_attempted.contains(&announcement.agent_id)
2419 {
2420 if let Some(ref net) = &network {
2421 let ant_peer = ant_quic::PeerId(announcement.machine_id.0);
2422 if !net.is_connected(&ant_peer).await {
2423 auto_connect_attempted.insert(announcement.agent_id);
2424 let net = std::sync::Arc::clone(net);
2425 let addresses = announcement.addresses.clone();
2426 tokio::spawn(async move {
2427 for addr in &addresses {
2428 match net.connect_addr(*addr).await {
2429 Ok(_) => {
2430 tracing::info!(
2431 "Auto-connected to discovered agent at {addr}",
2432 );
2433 return;
2434 }
2435 Err(e) => {
2436 tracing::debug!("Auto-connect to {addr} failed: {e}",);
2437 }
2438 }
2439 }
2440 tracing::debug!(
2441 "Auto-connect exhausted all {} addresses for discovered agent",
2442 addresses.len(),
2443 );
2444 });
2445 }
2446 }
2447 }
2448 }
2449 });
2450
2451 Ok(())
2452 }
2453
2454 fn unix_timestamp_secs() -> u64 {
2455 std::time::SystemTime::now()
2456 .duration_since(std::time::UNIX_EPOCH)
2457 .map_or(0, |d| d.as_secs())
2458 }
2459
2460 fn announcement_addresses(&self) -> Vec<std::net::SocketAddr> {
2461 match self.network.as_ref().and_then(|n| n.local_addr()) {
2462 Some(addr) if addr.port() > 0 => collect_local_interface_addrs(addr.port()),
2463 _ => Vec::new(),
2464 }
2465 }
2466
2467 fn build_identity_announcement(
2468 &self,
2469 include_user_identity: bool,
2470 human_consent: bool,
2471 ) -> error::Result<IdentityAnnouncement> {
2472 self.build_identity_announcement_with_addrs(
2473 include_user_identity,
2474 human_consent,
2475 self.announcement_addresses(),
2476 )
2477 }
2478
2479 fn build_identity_announcement_with_addrs(
2480 &self,
2481 include_user_identity: bool,
2482 human_consent: bool,
2483 addresses: Vec<std::net::SocketAddr>,
2484 ) -> error::Result<IdentityAnnouncement> {
2485 if include_user_identity && !human_consent {
2486 return Err(error::IdentityError::Storage(std::io::Error::other(
2487 "human identity disclosure requires explicit human consent — set human_consent: true in the request body",
2488 )));
2489 }
2490
2491 let (user_id, agent_certificate) = if include_user_identity {
2492 let user_id = self.user_id().ok_or_else(|| {
2493 error::IdentityError::Storage(std::io::Error::other(
2494 "human identity disclosure requested but no user identity is configured — set user_key_path in your config.toml to point at your user keypair file",
2495 ))
2496 })?;
2497 let cert = self.agent_certificate().cloned().ok_or_else(|| {
2498 error::IdentityError::Storage(std::io::Error::other(
2499 "human identity disclosure requested but agent certificate is missing",
2500 ))
2501 })?;
2502 (Some(user_id), Some(cert))
2503 } else {
2504 (None, None)
2505 };
2506
2507 let machine_public_key = self
2508 .identity
2509 .machine_keypair()
2510 .public_key()
2511 .as_bytes()
2512 .to_vec();
2513
2514 let unsigned = IdentityAnnouncementUnsigned {
2518 agent_id: self.agent_id(),
2519 machine_id: self.machine_id(),
2520 user_id,
2521 agent_certificate: agent_certificate.clone(),
2522 machine_public_key: machine_public_key.clone(),
2523 addresses,
2524 announced_at: Self::unix_timestamp_secs(),
2525 nat_type: None,
2526 can_receive_direct: None,
2527 is_relay: None,
2528 is_coordinator: None,
2529 };
2530 let unsigned_bytes = bincode::serialize(&unsigned).map_err(|e| {
2531 error::IdentityError::Serialization(format!(
2532 "failed to serialize unsigned identity announcement: {e}"
2533 ))
2534 })?;
2535 let machine_signature = ant_quic::crypto::raw_public_keys::pqc::sign_with_ml_dsa(
2536 self.identity.machine_keypair().secret_key(),
2537 &unsigned_bytes,
2538 )
2539 .map_err(|e| {
2540 error::IdentityError::Storage(std::io::Error::other(format!(
2541 "failed to sign identity announcement with machine key: {:?}",
2542 e
2543 )))
2544 })?
2545 .as_bytes()
2546 .to_vec();
2547
2548 Ok(IdentityAnnouncement {
2549 agent_id: unsigned.agent_id,
2550 machine_id: unsigned.machine_id,
2551 user_id: unsigned.user_id,
2552 agent_certificate: unsigned.agent_certificate,
2553 machine_public_key,
2554 machine_signature,
2555 addresses: unsigned.addresses,
2556 announced_at: unsigned.announced_at,
2557 nat_type: unsigned.nat_type,
2558 can_receive_direct: unsigned.can_receive_direct,
2559 is_relay: unsigned.is_relay,
2560 is_coordinator: unsigned.is_coordinator,
2561 })
2562 }
2563
2564 pub async fn join_network(&self) -> error::Result<()> {
2573 let Some(network) = self.network.as_ref() else {
2574 tracing::debug!("join_network called but no network configured");
2575 return Ok(());
2576 };
2577
2578 if let Some(ref runtime) = self.gossip_runtime {
2579 runtime.start().await.map_err(|e| {
2580 error::IdentityError::Storage(std::io::Error::other(format!(
2581 "failed to start gossip runtime: {e}"
2582 )))
2583 })?;
2584 tracing::info!("Gossip runtime started");
2585 }
2586 self.start_identity_listener().await?;
2587 self.start_network_event_listener();
2588 self.start_direct_listener();
2589
2590 let bootstrap_nodes = network.config().bootstrap_nodes.clone();
2591
2592 let min_connected = 3;
2593 let mut all_connected: Vec<std::net::SocketAddr> = Vec::new();
2594
2595 if let Some(ref cache) = self.bootstrap_cache {
2603 let coordinators = cache.select_coordinators(6).await;
2604 let coordinator_addrs: Vec<std::net::SocketAddr> = coordinators
2605 .iter()
2606 .flat_map(|peer| peer.preferred_addresses())
2607 .collect();
2608
2609 if !coordinator_addrs.is_empty() {
2610 tracing::info!(
2611 "Phase 0: Trying {} addresses from {} cached coordinators",
2612 coordinator_addrs.len(),
2613 coordinators.len()
2614 );
2615 let (succeeded, _failed) = self
2616 .connect_peers_parallel_tracked(network, &coordinator_addrs)
2617 .await;
2618 all_connected.extend(&succeeded);
2619 tracing::info!(
2620 "Phase 0: {}/{} coordinator addresses connected",
2621 succeeded.len(),
2622 coordinator_addrs.len()
2623 );
2624 }
2625 }
2626
2627 if all_connected.len() < min_connected {
2629 if let Some(ref cache) = self.bootstrap_cache {
2630 const PHASE1_PEER_CANDIDATES: usize = 12;
2631 let cached_peers = cache.select_peers(PHASE1_PEER_CANDIDATES).await;
2632 if !cached_peers.is_empty() {
2633 tracing::info!("Phase 1: Trying {} cached peers", cached_peers.len());
2634 let (succeeded, _failed) = self
2635 .connect_cached_peers_parallel_tracked(network, &cached_peers)
2636 .await;
2637 all_connected.extend(&succeeded);
2638 tracing::info!(
2639 "Phase 1: {}/{} cached peers connected",
2640 succeeded.len(),
2641 cached_peers.len()
2642 );
2643 }
2644 }
2645 } if all_connected.len() < min_connected && !bootstrap_nodes.is_empty() {
2650 let remaining: Vec<std::net::SocketAddr> = bootstrap_nodes
2651 .iter()
2652 .filter(|addr| !all_connected.contains(addr))
2653 .copied()
2654 .collect();
2655
2656 let (succeeded, mut failed) = self
2658 .connect_peers_parallel_tracked(network, &remaining)
2659 .await;
2660 all_connected.extend(&succeeded);
2661 tracing::info!(
2662 "Phase 2 round 1: {}/{} bootstrap peers connected",
2663 succeeded.len(),
2664 remaining.len()
2665 );
2666
2667 for round in 2..=3 {
2669 if failed.is_empty() {
2670 break;
2671 }
2672 let delay = std::time::Duration::from_secs(if round == 2 { 10 } else { 15 });
2673 tracing::info!(
2674 "Retrying {} failed peers in {}s (round {})",
2675 failed.len(),
2676 delay.as_secs(),
2677 round
2678 );
2679 tokio::time::sleep(delay).await;
2680
2681 let (succeeded, still_failed) =
2682 self.connect_peers_parallel_tracked(network, &failed).await;
2683 all_connected.extend(&succeeded);
2684 failed = still_failed;
2685 tracing::info!(
2686 "Phase 2 round {}: {} total peers connected",
2687 round,
2688 all_connected.len()
2689 );
2690 }
2691
2692 if !failed.is_empty() {
2693 tracing::warn!(
2694 "Could not connect to {} bootstrap peers: {:?}",
2695 failed.len(),
2696 failed
2697 );
2698 }
2699 }
2700
2701 tracing::info!(
2702 "Network join complete. Connected to {} peers.",
2703 all_connected.len()
2704 );
2705
2706 if let Some(ref runtime) = self.gossip_runtime {
2708 let seeds: Vec<String> = all_connected.iter().map(|addr| addr.to_string()).collect();
2709 if !seeds.is_empty() {
2710 if let Err(e) = runtime.membership().join(seeds).await {
2711 tracing::warn!("HyParView membership join failed: {e}");
2712 }
2713 }
2714 }
2715
2716 if let Some(ref pw) = self.presence {
2718 if let Some(ref runtime) = self.gossip_runtime {
2720 let active = runtime.membership().active_view();
2721 for peer in active {
2722 pw.manager().add_broadcast_peer(peer).await;
2723 }
2724 tracing::info!(
2725 "Presence seeded with {} broadcast peers",
2726 pw.manager().broadcast_peer_count().await
2727 );
2728 }
2729
2730 if let Some(ref net) = self.network {
2737 if let Some(status) = net.node_status().await {
2738 let hints: Vec<String> = status
2739 .external_addrs
2740 .iter()
2741 .filter(|a| is_publicly_advertisable(**a))
2742 .map(|a| a.to_string())
2743 .collect();
2744 pw.manager().set_addr_hints(hints).await;
2745 }
2746 }
2747
2748 if pw.config().enable_beacons {
2749 if let Err(e) = pw
2750 .manager()
2751 .start_beacons(pw.config().beacon_interval_secs)
2752 .await
2753 {
2754 tracing::warn!("Failed to start presence beacons: {e}");
2755 } else {
2756 tracing::info!(
2757 "Presence beacons started (interval={}s)",
2758 pw.config().beacon_interval_secs
2759 );
2760 }
2761 }
2762
2763 pw.start_event_loop(std::sync::Arc::clone(&self.identity_discovery_cache))
2767 .await;
2768 tracing::debug!("Presence event loop started");
2769 }
2770
2771 if let Err(e) = self.announce_identity(false, false).await {
2772 tracing::warn!("Initial identity announcement failed: {}", e);
2773 }
2774 if let Err(e) = self.start_identity_heartbeat().await {
2775 tracing::warn!("Failed to start identity heartbeat: {e}");
2776 }
2777
2778 if let (Some(ref runtime), Some(ref network)) = (&self.gossip_runtime, &self.network) {
2785 let ctx = HeartbeatContext {
2786 identity: std::sync::Arc::clone(&self.identity),
2787 runtime: std::sync::Arc::clone(runtime),
2788 network: std::sync::Arc::clone(network),
2789 interval_secs: self.heartbeat_interval_secs,
2790 cache: std::sync::Arc::clone(&self.identity_discovery_cache),
2791 user_identity_consented: std::sync::Arc::clone(&self.user_identity_consented),
2792 };
2793 tokio::spawn(async move {
2794 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
2795 if let Err(e) = ctx.announce().await {
2796 tracing::warn!("Delayed identity re-announcement failed: {e}");
2797 } else {
2798 tracing::info!(
2799 "Delayed identity re-announcement sent (gossip mesh stabilized)"
2800 );
2801 }
2802 });
2803 }
2804
2805 if let Some(ref runtime) = self.gossip_runtime {
2809 let signing = std::sync::Arc::new(gossip::SigningContext::from_keypair(
2810 self.identity.agent_keypair(),
2811 ));
2812 let caps_rx = self.dm_capabilities_tx.subscribe();
2813 match dm_capability_service::CapabilityAdvertService::spawn_default(
2814 std::sync::Arc::clone(runtime.pubsub()),
2815 signing,
2816 self.identity.agent_id(),
2817 self.identity.machine_id(),
2818 caps_rx,
2819 std::sync::Arc::clone(&self.capability_store),
2820 )
2821 .await
2822 {
2823 Ok(service) => {
2824 let mut guard = self.capability_advert_service.lock().await;
2825 if let Some(prev) = guard.take() {
2826 prev.abort();
2827 }
2828 *guard = Some(service);
2829 tracing::info!("Capability advert service started");
2830 }
2831 Err(e) => tracing::warn!("failed to start capability advert service: {e}"),
2832 }
2833 }
2834
2835 Ok(())
2836 }
2837
2838 #[must_use]
2840 pub fn capability_store(&self) -> std::sync::Arc<dm_capability::CapabilityStore> {
2841 std::sync::Arc::clone(&self.capability_store)
2842 }
2843
2844 #[must_use]
2846 pub fn dm_inflight_acks(&self) -> std::sync::Arc<dm::InFlightAcks> {
2847 std::sync::Arc::clone(&self.dm_inflight_acks)
2848 }
2849
2850 #[must_use]
2852 pub fn recent_delivery_cache(&self) -> std::sync::Arc<dm::RecentDeliveryCache> {
2853 std::sync::Arc::clone(&self.recent_delivery_cache)
2854 }
2855
2856 pub async fn start_dm_inbox(
2863 &self,
2864 kem_keypair: std::sync::Arc<groups::kem_envelope::AgentKemKeypair>,
2865 config: dm_inbox::DmInboxConfig,
2866 ) -> error::Result<()> {
2867 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2868 error::IdentityError::Storage(std::io::Error::other(
2869 "cannot start DM inbox: no gossip runtime configured",
2870 ))
2871 })?;
2872 let signing = std::sync::Arc::new(gossip::SigningContext::from_keypair(
2873 self.identity.agent_keypair(),
2874 ));
2875 let service = dm_inbox::DmInboxService::spawn(
2876 std::sync::Arc::clone(runtime.pubsub()),
2877 signing,
2878 self.identity.agent_id(),
2879 self.identity.machine_id(),
2880 std::sync::Arc::clone(&kem_keypair),
2881 std::sync::Arc::clone(&self.direct_messaging),
2882 std::sync::Arc::clone(&self.contact_store),
2883 std::sync::Arc::clone(&self.dm_inflight_acks),
2884 std::sync::Arc::clone(&self.recent_delivery_cache),
2885 config,
2886 )
2887 .await
2888 .map_err(|e| {
2889 error::IdentityError::Storage(std::io::Error::other(format!(
2890 "DM inbox spawn failed: {e}"
2891 )))
2892 })?;
2893 let mut guard = self.dm_inbox_service.lock().await;
2894 if let Some(prev) = guard.take() {
2895 prev.abort();
2896 }
2897 *guard = Some(service);
2898
2899 let upgraded =
2903 dm::DmCapabilities::pending().with_kem_public_key(kem_keypair.public_bytes.clone());
2904 if self.dm_capabilities_tx.send(upgraded).is_err() {
2905 tracing::debug!("dm_capabilities watch has no receivers; skipping upgrade broadcast");
2906 }
2907 tracing::info!("DM inbox service started");
2908 Ok(())
2909 }
2910
2911 pub async fn stop_dm_inbox(&self) {
2913 let mut guard = self.dm_inbox_service.lock().await;
2914 if let Some(service) = guard.take() {
2915 service.abort();
2916 }
2917 }
2918
2919 async fn connect_cached_peers_parallel_tracked(
2921 &self,
2922 network: &std::sync::Arc<network::NetworkNode>,
2923 peers: &[ant_quic::CachedPeer],
2924 ) -> (Vec<std::net::SocketAddr>, Vec<ant_quic::PeerId>) {
2925 use tokio::time::{timeout, Duration};
2926 const CONNECT_TIMEOUT: Duration = Duration::from_secs(15);
2927
2928 let handles: Vec<_> = peers
2929 .iter()
2930 .map(|peer| {
2931 let net = network.clone();
2932 let peer_id = peer.peer_id;
2933 tokio::spawn(async move {
2934 tracing::debug!("Connecting to cached peer: {:?}", peer_id);
2935 match timeout(CONNECT_TIMEOUT, net.connect_cached_peer(peer_id)).await {
2936 Ok(Ok(addr)) => {
2937 tracing::info!("Connected to cached peer {:?} at {}", peer_id, addr);
2938 Ok(addr)
2939 }
2940 Ok(Err(e)) => {
2941 tracing::warn!("Failed to connect to cached peer {:?}: {}", peer_id, e);
2942 Err(peer_id)
2943 }
2944 Err(_) => {
2945 tracing::warn!(
2946 "Connection to cached peer {:?} timed out after {}s",
2947 peer_id,
2948 CONNECT_TIMEOUT.as_secs()
2949 );
2950 Err(peer_id)
2951 }
2952 }
2953 })
2954 })
2955 .collect();
2956
2957 let mut succeeded = Vec::new();
2958 let mut failed = Vec::new();
2959 for handle in handles {
2960 match handle.await {
2961 Ok(Ok(addr)) => succeeded.push(addr),
2962 Ok(Err(peer_id)) => failed.push(peer_id),
2963 Err(e) => tracing::error!("Connection task panicked: {}", e),
2964 }
2965 }
2966 (succeeded, failed)
2967 }
2968
2969 async fn connect_peers_parallel_tracked(
2971 &self,
2972 network: &std::sync::Arc<network::NetworkNode>,
2973 addrs: &[std::net::SocketAddr],
2974 ) -> (Vec<std::net::SocketAddr>, Vec<std::net::SocketAddr>) {
2975 use tokio::time::{timeout, Duration};
2976
2977 const CONNECT_TIMEOUT: Duration = Duration::from_secs(15);
2980
2981 let handles: Vec<_> = addrs
2982 .iter()
2983 .map(|addr| {
2984 let net = network.clone();
2985 let addr = *addr;
2986 tokio::spawn(async move {
2987 tracing::debug!("Connecting to peer: {}", addr);
2988 match timeout(CONNECT_TIMEOUT, net.connect_addr(addr)).await {
2989 Ok(Ok(_)) => {
2990 tracing::info!("Connected to peer: {}", addr);
2991 Ok(addr)
2992 }
2993 Ok(Err(e)) => {
2994 tracing::warn!("Failed to connect to {}: {}", addr, e);
2995 Err(addr)
2996 }
2997 Err(_) => {
2998 tracing::warn!(
2999 "Connection to {} timed out after {}s",
3000 addr,
3001 CONNECT_TIMEOUT.as_secs()
3002 );
3003 Err(addr)
3004 }
3005 }
3006 })
3007 })
3008 .collect();
3009
3010 let mut succeeded = Vec::new();
3011 let mut failed = Vec::new();
3012 for handle in handles {
3013 match handle.await {
3014 Ok(Ok(addr)) => succeeded.push(addr),
3015 Ok(Err(addr)) => failed.push(addr),
3016 Err(e) => tracing::error!("Connection task panicked: {}", e),
3017 }
3018 }
3019 (succeeded, failed)
3020 }
3021
3022 pub async fn subscribe(&self, topic: &str) -> error::Result<Subscription> {
3032 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3033 error::IdentityError::Storage(std::io::Error::other(
3034 "gossip runtime not initialized - configure agent with network first",
3035 ))
3036 })?;
3037 Ok(runtime.pubsub().subscribe(topic.to_string()).await)
3038 }
3039
3040 pub async fn publish(&self, topic: &str, payload: Vec<u8>) -> error::Result<()> {
3052 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3053 error::IdentityError::Storage(std::io::Error::other(
3054 "gossip runtime not initialized - configure agent with network first",
3055 ))
3056 })?;
3057 runtime
3058 .pubsub()
3059 .publish(topic.to_string(), bytes::Bytes::from(payload))
3060 .await
3061 .map_err(|e| {
3062 error::IdentityError::Storage(std::io::Error::other(format!(
3063 "publish failed: {}",
3064 e
3065 )))
3066 })
3067 }
3068
3069 pub async fn peers(&self) -> error::Result<Vec<saorsa_gossip_types::PeerId>> {
3077 let network = self.network.as_ref().ok_or_else(|| {
3078 error::IdentityError::Storage(std::io::Error::other(
3079 "network not initialized - configure agent with network first",
3080 ))
3081 })?;
3082 let ant_peers = network.connected_peers().await;
3083 Ok(ant_peers
3084 .into_iter()
3085 .map(|p| saorsa_gossip_types::PeerId::new(p.0))
3086 .collect())
3087 }
3088
3089 pub async fn presence(&self) -> error::Result<Vec<identity::AgentId>> {
3097 self.start_identity_listener().await?;
3098 let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
3099 let mut agents: Vec<_> = self
3100 .identity_discovery_cache
3101 .read()
3102 .await
3103 .values()
3104 .filter(|a| a.announced_at >= cutoff)
3105 .map(|a| a.agent_id)
3106 .collect();
3107 agents.sort_by_key(|a| a.0);
3108 Ok(agents)
3109 }
3110
3111 pub async fn subscribe_presence(
3125 &self,
3126 ) -> error::NetworkResult<tokio::sync::broadcast::Receiver<presence::PresenceEvent>> {
3127 let pw = self.presence.as_ref().ok_or_else(|| {
3128 error::NetworkError::NodeError("presence system not initialized".to_string())
3129 })?;
3130 pw.start_event_loop(std::sync::Arc::clone(&self.identity_discovery_cache))
3132 .await;
3133 Ok(pw.subscribe_events())
3134 }
3135
3136 pub async fn cached_agent(&self, id: &identity::AgentId) -> Option<DiscoveredAgent> {
3142 self.identity_discovery_cache.read().await.get(id).cloned()
3143 }
3144
3145 pub async fn is_agent_machine_verified(
3152 &self,
3153 agent_id: &identity::AgentId,
3154 machine_id: &identity::MachineId,
3155 ) -> bool {
3156 let cache = self.identity_discovery_cache.read().await;
3157 cache
3158 .get(agent_id)
3159 .map(|entry| entry.machine_id == *machine_id)
3160 .unwrap_or(false)
3161 }
3162
3163 pub async fn discover_agents_foaf(
3182 &self,
3183 ttl: u8,
3184 timeout_ms: u64,
3185 ) -> error::NetworkResult<Vec<DiscoveredAgent>> {
3186 let pw = self.presence.as_ref().ok_or_else(|| {
3187 error::NetworkError::NodeError("presence system not initialized".to_string())
3188 })?;
3189
3190 let topic = presence::global_presence_topic();
3191 let raw_results: Vec<(
3192 saorsa_gossip_types::PeerId,
3193 saorsa_gossip_types::PresenceRecord,
3194 )> = pw
3195 .manager()
3196 .initiate_foaf_query(topic, ttl, timeout_ms)
3197 .await
3198 .map_err(|e| error::NetworkError::NodeError(e.to_string()))?;
3199
3200 let cache = self.identity_discovery_cache.read().await;
3201
3202 let mut seen: std::collections::HashSet<identity::AgentId> =
3204 std::collections::HashSet::new();
3205 let mut agents: Vec<DiscoveredAgent> = Vec::with_capacity(raw_results.len());
3206
3207 for (peer_id, record) in &raw_results {
3208 if let Some(agent) =
3209 presence::presence_record_to_discovered_agent(*peer_id, record, &cache)
3210 {
3211 if seen.insert(agent.agent_id) {
3212 agents.push(agent);
3213 }
3214 }
3215 }
3216
3217 Ok(agents)
3218 }
3219
3220 pub async fn discover_agent_by_id(
3234 &self,
3235 target_id: identity::AgentId,
3236 ttl: u8,
3237 timeout_ms: u64,
3238 ) -> error::NetworkResult<Option<DiscoveredAgent>> {
3239 {
3241 let cache = self.identity_discovery_cache.read().await;
3242 if let Some(agent) = cache.get(&target_id) {
3243 return Ok(Some(agent.clone()));
3244 }
3245 }
3246
3247 let agents = self.discover_agents_foaf(ttl, timeout_ms).await?;
3249 Ok(agents.into_iter().find(|a| a.agent_id == target_id))
3250 }
3251
3252 pub async fn find_agent(
3269 &self,
3270 agent_id: identity::AgentId,
3271 ) -> error::Result<Option<Vec<std::net::SocketAddr>>> {
3272 self.start_identity_listener().await?;
3273
3274 if let Some(addrs) = self
3276 .identity_discovery_cache
3277 .read()
3278 .await
3279 .get(&agent_id)
3280 .map(|e| e.addresses.clone())
3281 {
3282 return Ok(Some(addrs));
3283 }
3284
3285 let runtime = match self.gossip_runtime.as_ref() {
3287 Some(r) => r,
3288 None => return Ok(None),
3289 };
3290 let shard_topic = shard_topic_for_agent(&agent_id);
3291 let mut sub = runtime.pubsub().subscribe(shard_topic).await;
3292 let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
3293 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
3294
3295 loop {
3296 if tokio::time::Instant::now() >= deadline {
3297 break;
3298 }
3299 let timeout = tokio::time::sleep_until(deadline);
3300 tokio::select! {
3301 Some(msg) = sub.recv() => {
3302 if let Ok(ann) = {
3303 use bincode::Options;
3304 bincode::DefaultOptions::new()
3305 .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
3306 .deserialize::<IdentityAnnouncement>(&msg.payload)
3307 } {
3308 if ann.verify().is_ok() && ann.agent_id == agent_id {
3309 let now = std::time::SystemTime::now()
3310 .duration_since(std::time::UNIX_EPOCH)
3311 .map_or(0, |d| d.as_secs());
3312 let filtered: Vec<std::net::SocketAddr> = ann
3313 .addresses
3314 .iter()
3315 .copied()
3316 .filter(|a| is_publicly_advertisable(*a))
3317 .collect();
3318 let addrs = filtered.clone();
3319 cache.write().await.insert(
3320 ann.agent_id,
3321 DiscoveredAgent {
3322 agent_id: ann.agent_id,
3323 machine_id: ann.machine_id,
3324 user_id: ann.user_id,
3325 addresses: filtered,
3326 announced_at: ann.announced_at,
3327 last_seen: now,
3328 machine_public_key: ann.machine_public_key.clone(),
3329 nat_type: ann.nat_type.clone(),
3330 can_receive_direct: ann.can_receive_direct,
3331 is_relay: ann.is_relay,
3332 is_coordinator: ann.is_coordinator,
3333 },
3334 );
3335 return Ok(Some(addrs));
3336 }
3337 }
3338 }
3339 _ = timeout => break,
3340 }
3341 }
3342
3343 if let Some(addrs) = self.find_agent_rendezvous(agent_id, 5).await? {
3346 let now = std::time::SystemTime::now()
3347 .duration_since(std::time::UNIX_EPOCH)
3348 .map_or(0, |d| d.as_secs());
3349 cache
3350 .write()
3351 .await
3352 .entry(agent_id)
3353 .or_insert_with(|| DiscoveredAgent {
3354 agent_id,
3355 machine_id: identity::MachineId([0u8; 32]),
3356 user_id: None,
3357 addresses: addrs.clone(),
3358 announced_at: now,
3359 last_seen: now,
3360 machine_public_key: Vec::new(),
3361 nat_type: None,
3362 can_receive_direct: None,
3363 is_relay: None,
3364 is_coordinator: None,
3365 });
3366 return Ok(Some(addrs));
3367 }
3368
3369 Ok(None)
3370 }
3371
3372 pub async fn find_agents_by_user(
3385 &self,
3386 user_id: identity::UserId,
3387 ) -> error::Result<Vec<DiscoveredAgent>> {
3388 self.start_identity_listener().await?;
3389 let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
3390 Ok(self
3391 .identity_discovery_cache
3392 .read()
3393 .await
3394 .values()
3395 .filter(|a| a.announced_at >= cutoff && a.user_id == Some(user_id))
3396 .cloned()
3397 .collect())
3398 }
3399
3400 #[must_use]
3408 pub fn local_addr(&self) -> Option<std::net::SocketAddr> {
3409 self.network.as_ref().and_then(|n| n.local_addr())
3410 }
3411
3412 pub async fn bound_addr(&self) -> Option<std::net::SocketAddr> {
3418 if let Some(ref network) = self.network {
3419 let addr = network.bound_addr().await;
3420 match (addr, self.local_addr()) {
3424 (Some(bound), Some(config)) if config.is_ipv4() && bound.is_ipv6() => {
3425 Some(std::net::SocketAddr::new(config.ip(), bound.port()))
3426 }
3427 (Some(bound), _) => Some(bound),
3428 _ => None,
3429 }
3430 } else {
3431 None
3432 }
3433 }
3434
3435 pub fn build_announcement(
3443 &self,
3444 include_user: bool,
3445 consent: bool,
3446 ) -> error::Result<IdentityAnnouncement> {
3447 self.build_identity_announcement(include_user, consent)
3448 }
3449
3450 fn start_network_event_listener(&self) {
3461 if self
3462 .network_event_listener_started
3463 .swap(true, std::sync::atomic::Ordering::AcqRel)
3464 {
3465 return;
3466 }
3467
3468 let Some(network) = self.network.as_ref().map(std::sync::Arc::clone) else {
3469 return;
3470 };
3471 let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
3472 let dm = std::sync::Arc::clone(&self.direct_messaging);
3473
3474 tokio::spawn(async move {
3475 let mut rx = network.subscribe();
3476 tracing::info!("Network event reconciliation listener started");
3477
3478 loop {
3479 let event = match rx.recv().await {
3480 Ok(event) => event,
3481 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
3482 tracing::warn!("Network event listener lagged by {skipped} events");
3483 continue;
3484 }
3485 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
3486 };
3487
3488 match event {
3489 network::NetworkEvent::PeerConnected { peer_id, .. } => {
3490 let machine_id = identity::MachineId(peer_id);
3491 let cached_agent_id = {
3492 let cache = cache.read().await;
3493 cache
3494 .values()
3495 .find(|entry| entry.machine_id == machine_id)
3496 .map(|entry| entry.agent_id)
3497 };
3498 let agent_id = match cached_agent_id {
3499 Some(agent_id) => Some(agent_id),
3500 None => dm.lookup_agent(&machine_id).await,
3501 };
3502 if let Some(agent_id) = agent_id {
3503 dm.mark_connected(agent_id, machine_id).await;
3504 }
3505 }
3506 network::NetworkEvent::PeerDisconnected { peer_id } => {
3507 let machine_id = identity::MachineId(peer_id);
3508 let cached_agent_id = {
3509 let cache = cache.read().await;
3510 cache
3511 .values()
3512 .find(|entry| entry.machine_id == machine_id)
3513 .map(|entry| entry.agent_id)
3514 };
3515 let agent_id = match cached_agent_id {
3516 Some(agent_id) => Some(agent_id),
3517 None => dm.lookup_agent(&machine_id).await,
3518 };
3519 if let Some(agent_id) = agent_id {
3520 dm.mark_disconnected(&agent_id).await;
3521 }
3522 }
3523 _ => {}
3524 }
3525 }
3526 });
3527 }
3528
3529 fn start_direct_listener(&self) {
3537 if self
3538 .direct_listener_started
3539 .swap(true, std::sync::atomic::Ordering::AcqRel)
3540 {
3541 return;
3542 }
3543
3544 let Some(network) = self.network.as_ref().map(std::sync::Arc::clone) else {
3545 return;
3546 };
3547 let dm = std::sync::Arc::clone(&self.direct_messaging);
3548 let discovery_cache = std::sync::Arc::clone(&self.identity_discovery_cache);
3549 let contact_store = std::sync::Arc::clone(&self.contact_store);
3550
3551 tokio::spawn(async move {
3552 tracing::info!(target: "x0x::direct", stage = "listener", "direct message listener started");
3553 loop {
3554 let Some((ant_peer_id, payload)) = network.recv_direct().await else {
3555 tracing::warn!(
3556 target: "x0x::direct",
3557 stage = "listener",
3558 "network.recv_direct channel closed — listener exiting"
3559 );
3560 break;
3561 };
3562
3563 let raw_bytes = payload.len();
3564
3565 if payload.len() < 32 {
3567 tracing::warn!(
3568 target: "x0x::direct",
3569 stage = "listener",
3570 machine_prefix = %network::hex_prefix(&ant_peer_id.0, 4),
3571 raw_bytes,
3572 outcome = "drop_too_short",
3573 "direct message too short to contain sender id"
3574 );
3575 continue;
3576 }
3577
3578 let mut sender_bytes = [0u8; 32];
3579 sender_bytes.copy_from_slice(&payload[..32]);
3580 let sender = identity::AgentId(sender_bytes);
3581 let machine_id = identity::MachineId(ant_peer_id.0);
3582 let data = payload[32..].to_vec();
3583 let payload_bytes = data.len();
3584
3585 let verified = {
3587 let cache = discovery_cache.read().await;
3588 cache
3589 .get(&sender)
3590 .map(|entry| entry.machine_id == machine_id)
3591 .unwrap_or(false)
3592 };
3593
3594 let trust_decision = {
3596 let contacts = contact_store.read().await;
3597 let evaluator = trust::TrustEvaluator::new(&contacts);
3598 let ctx = trust::TrustContext {
3599 agent_id: &sender,
3600 machine_id: &machine_id,
3601 };
3602 Some(evaluator.evaluate(&ctx))
3603 };
3604
3605 tracing::info!(
3606 target: "x0x::direct",
3607 stage = "recv",
3608 sender_prefix = %network::hex_prefix(&sender.0, 4),
3609 machine_prefix = %network::hex_prefix(&machine_id.0, 4),
3610 raw_bytes,
3611 payload_bytes,
3612 verified,
3613 trust_decision = ?trust_decision,
3614 "direct message received; dispatching to subscribers"
3615 );
3616
3617 dm.mark_connected(sender, machine_id).await;
3619
3620 dm.handle_incoming(machine_id, sender, data, verified, trust_decision)
3622 .await;
3623
3624 tracing::debug!(
3625 target: "x0x::direct",
3626 stage = "recv",
3627 sender_prefix = %network::hex_prefix(&sender.0, 4),
3628 payload_bytes,
3629 subscriber_count = dm.subscriber_count(),
3630 "direct message dispatched"
3631 );
3632 }
3633 });
3634 }
3635
3636 pub async fn start_identity_heartbeat(&self) -> error::Result<()> {
3644 let mut handle_guard = self.heartbeat_handle.lock().await;
3645 if handle_guard.is_some() {
3646 return Ok(());
3647 }
3648 let Some(runtime) = self.gossip_runtime.as_ref().map(std::sync::Arc::clone) else {
3649 return Err(error::IdentityError::Storage(std::io::Error::other(
3650 "gossip runtime not initialized — cannot start heartbeat",
3651 )));
3652 };
3653 let Some(network) = self.network.as_ref().map(std::sync::Arc::clone) else {
3654 return Err(error::IdentityError::Storage(std::io::Error::other(
3655 "network not initialized — cannot start heartbeat",
3656 )));
3657 };
3658 let ctx = HeartbeatContext {
3659 identity: std::sync::Arc::clone(&self.identity),
3660 runtime,
3661 network,
3662 interval_secs: self.heartbeat_interval_secs,
3663 cache: std::sync::Arc::clone(&self.identity_discovery_cache),
3664 user_identity_consented: std::sync::Arc::clone(&self.user_identity_consented),
3665 };
3666 let handle = tokio::task::spawn(async move {
3667 let mut ticker =
3668 tokio::time::interval(std::time::Duration::from_secs(ctx.interval_secs));
3669 ticker.tick().await; loop {
3671 ticker.tick().await;
3672 if let Err(e) = ctx.announce().await {
3673 tracing::warn!("identity heartbeat announce failed: {e}");
3674 }
3675 }
3676 });
3677 *handle_guard = Some(handle);
3678 Ok(())
3679 }
3680
3681 pub async fn advertise_identity(&self, validity_ms: u64) -> error::Result<()> {
3711 use saorsa_gossip_rendezvous::{Capability, ProviderSummary};
3712
3713 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3714 error::IdentityError::Storage(std::io::Error::other(
3715 "gossip runtime not initialized — cannot advertise identity",
3716 ))
3717 })?;
3718
3719 let peer_id = runtime.peer_id();
3720 let addresses = self.announcement_addresses();
3721 let addr_bytes = bincode::serialize(&addresses).map_err(|e| {
3722 error::IdentityError::Serialization(format!(
3723 "failed to serialize addresses for rendezvous: {e}"
3724 ))
3725 })?;
3726
3727 let mut summary = ProviderSummary::new(
3728 self.agent_id().0,
3729 peer_id,
3730 vec![Capability::Identity],
3731 validity_ms,
3732 )
3733 .with_extensions(addr_bytes);
3734
3735 summary
3736 .sign_raw(self.identity.machine_keypair().secret_key().as_bytes())
3737 .map_err(|e| {
3738 error::IdentityError::Storage(std::io::Error::other(format!(
3739 "failed to sign rendezvous summary: {e}"
3740 )))
3741 })?;
3742
3743 let cbor_bytes = summary.to_cbor().map_err(|e| {
3744 error::IdentityError::Serialization(format!(
3745 "failed to CBOR-encode rendezvous summary: {e}"
3746 ))
3747 })?;
3748
3749 let topic = rendezvous_shard_topic_for_agent(&self.agent_id());
3750 runtime
3751 .pubsub()
3752 .publish(topic, bytes::Bytes::from(cbor_bytes))
3753 .await
3754 .map_err(|e| {
3755 error::IdentityError::Storage(std::io::Error::other(format!(
3756 "failed to publish rendezvous summary: {e}"
3757 )))
3758 })?;
3759
3760 self.rendezvous_advertised
3761 .store(true, std::sync::atomic::Ordering::Relaxed);
3762 Ok(())
3763 }
3764
3765 pub async fn find_agent_rendezvous(
3778 &self,
3779 agent_id: identity::AgentId,
3780 timeout_secs: u64,
3781 ) -> error::Result<Option<Vec<std::net::SocketAddr>>> {
3782 use saorsa_gossip_rendezvous::ProviderSummary;
3783
3784 let runtime = match self.gossip_runtime.as_ref() {
3785 Some(r) => r,
3786 None => return Ok(None),
3787 };
3788
3789 let topic = rendezvous_shard_topic_for_agent(&agent_id);
3790 let mut sub = runtime.pubsub().subscribe(topic).await;
3791 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
3792
3793 loop {
3794 if tokio::time::Instant::now() >= deadline {
3795 break;
3796 }
3797 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
3798 tokio::select! {
3799 Some(msg) = sub.recv() => {
3800 let summary = match ProviderSummary::from_cbor(&msg.payload) {
3801 Ok(s) => s,
3802 Err(_) => continue,
3803 };
3804 if summary.target != agent_id.0 {
3805 continue;
3806 }
3807 let cached_pub = self
3813 .identity_discovery_cache
3814 .read()
3815 .await
3816 .get(&agent_id)
3817 .map(|e| e.machine_public_key.clone());
3818 if let Some(pub_bytes) = cached_pub {
3819 if !pub_bytes.is_empty()
3820 && !summary.verify_raw(&pub_bytes).unwrap_or(false)
3821 {
3822 tracing::warn!(
3823 "Rendezvous summary signature verification failed for agent {:?}; discarding",
3824 agent_id
3825 );
3826 continue;
3827 }
3828 }
3829 let addrs: Vec<std::net::SocketAddr> = summary
3831 .extensions
3832 .as_deref()
3833 .and_then(|b| {
3834 use bincode::Options;
3835 bincode::DefaultOptions::new()
3836 .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
3837 .deserialize(b)
3838 .ok()
3839 })
3840 .unwrap_or_default();
3841 if !addrs.is_empty() {
3842 return Ok(Some(addrs));
3843 }
3844 }
3845 _ = tokio::time::sleep(remaining) => break,
3846 }
3847 }
3848
3849 Ok(None)
3850 }
3851
3852 #[doc(hidden)]
3858 pub async fn insert_discovered_agent_for_testing(&self, agent: DiscoveredAgent) {
3859 let agent_id = agent.agent_id;
3860 let machine_id = agent.machine_id;
3861 self.identity_discovery_cache
3862 .write()
3863 .await
3864 .insert(agent_id, agent);
3865
3866 if machine_id.0 != [0u8; 32] {
3867 self.direct_messaging
3868 .register_agent(agent_id, machine_id)
3869 .await;
3870 if let Some(ref network) = self.network {
3871 let ant_peer_id = ant_quic::PeerId(machine_id.0);
3872 if network.is_connected(&ant_peer_id).await {
3873 self.direct_messaging
3874 .mark_connected(agent_id, machine_id)
3875 .await;
3876 }
3877 }
3878 }
3879 }
3880
3881 pub async fn create_task_list(&self, name: &str, topic: &str) -> error::Result<TaskListHandle> {
3905 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3906 error::IdentityError::Storage(std::io::Error::other(
3907 "gossip runtime not initialized - configure agent with network first",
3908 ))
3909 })?;
3910
3911 let peer_id = runtime.peer_id();
3912 let list_id = crdt::TaskListId::from_content(name, &self.agent_id(), 0);
3913 let task_list = crdt::TaskList::new(list_id, name.to_string(), peer_id);
3914
3915 let sync = crdt::TaskListSync::new(
3916 task_list,
3917 std::sync::Arc::clone(runtime.pubsub()),
3918 topic.to_string(),
3919 30,
3920 )
3921 .map_err(|e| {
3922 error::IdentityError::Storage(std::io::Error::other(format!(
3923 "task list sync creation failed: {}",
3924 e
3925 )))
3926 })?;
3927
3928 let sync = std::sync::Arc::new(sync);
3929 sync.start().await.map_err(|e| {
3930 error::IdentityError::Storage(std::io::Error::other(format!(
3931 "task list sync start failed: {}",
3932 e
3933 )))
3934 })?;
3935
3936 Ok(TaskListHandle {
3937 sync,
3938 agent_id: self.agent_id(),
3939 peer_id,
3940 })
3941 }
3942
3943 pub async fn join_task_list(&self, topic: &str) -> error::Result<TaskListHandle> {
3966 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3967 error::IdentityError::Storage(std::io::Error::other(
3968 "gossip runtime not initialized - configure agent with network first",
3969 ))
3970 })?;
3971
3972 let peer_id = runtime.peer_id();
3973 let list_id = crdt::TaskListId::from_content(topic, &self.agent_id(), 0);
3975 let task_list = crdt::TaskList::new(list_id, String::new(), peer_id);
3976
3977 let sync = crdt::TaskListSync::new(
3978 task_list,
3979 std::sync::Arc::clone(runtime.pubsub()),
3980 topic.to_string(),
3981 30,
3982 )
3983 .map_err(|e| {
3984 error::IdentityError::Storage(std::io::Error::other(format!(
3985 "task list sync creation failed: {}",
3986 e
3987 )))
3988 })?;
3989
3990 let sync = std::sync::Arc::new(sync);
3991 sync.start().await.map_err(|e| {
3992 error::IdentityError::Storage(std::io::Error::other(format!(
3993 "task list sync start failed: {}",
3994 e
3995 )))
3996 })?;
3997
3998 Ok(TaskListHandle {
3999 sync,
4000 agent_id: self.agent_id(),
4001 peer_id,
4002 })
4003 }
4004}
4005
4006impl AgentBuilder {
4007 pub fn with_machine_key<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
4019 self.machine_key_path = Some(path.as_ref().to_path_buf());
4020 self
4021 }
4022
4023 pub fn with_agent_key(mut self, keypair: identity::AgentKeypair) -> Self {
4042 self.agent_keypair = Some(keypair);
4043 self
4044 }
4045
4046 pub fn with_agent_key_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
4061 self.agent_key_path = Some(path.as_ref().to_path_buf());
4062 self
4063 }
4064
4065 #[must_use]
4080 pub fn with_agent_cert_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
4081 self.agent_cert_path = Some(path.as_ref().to_path_buf());
4082 self
4083 }
4084
4085 pub fn with_network_config(mut self, config: network::NetworkConfig) -> Self {
4097 self.network_config = Some(config);
4098 self
4099 }
4100
4101 pub fn with_peer_cache_dir<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
4107 self.peer_cache_dir = Some(path.as_ref().to_path_buf());
4108 self
4109 }
4110
4111 pub fn with_peer_cache_disabled(mut self) -> Self {
4120 self.disable_peer_cache = true;
4121 self
4122 }
4123
4124 pub fn with_user_key(mut self, keypair: identity::UserKeypair) -> Self {
4142 self.user_keypair = Some(keypair);
4143 self
4144 }
4145
4146 pub fn with_user_key_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
4162 self.user_key_path = Some(path.as_ref().to_path_buf());
4163 self
4164 }
4165
4166 #[must_use]
4174 pub fn with_heartbeat_interval(mut self, secs: u64) -> Self {
4175 self.heartbeat_interval_secs = Some(secs);
4176 self
4177 }
4178
4179 #[must_use]
4190 pub fn with_identity_ttl(mut self, secs: u64) -> Self {
4191 self.identity_ttl_secs = Some(secs);
4192 self
4193 }
4194
4195 #[must_use]
4197 pub fn with_presence_beacon_interval(mut self, secs: u64) -> Self {
4198 self.presence_beacon_interval_secs = Some(secs);
4199 self
4200 }
4201
4202 #[must_use]
4204 pub fn with_presence_event_poll_interval(mut self, secs: u64) -> Self {
4205 self.presence_event_poll_interval_secs = Some(secs);
4206 self
4207 }
4208
4209 #[must_use]
4211 pub fn with_presence_offline_timeout(mut self, secs: u64) -> Self {
4212 self.presence_offline_timeout_secs = Some(secs);
4213 self
4214 }
4215
4216 #[must_use]
4225 pub fn with_contact_store_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
4226 self.contact_store_path = Some(path.as_ref().to_path_buf());
4227 self
4228 }
4229
4230 pub async fn build(self) -> error::Result<Agent> {
4246 let machine_keypair = if let Some(path) = self.machine_key_path {
4248 match storage::load_machine_keypair_from(&path).await {
4250 Ok(kp) => kp,
4251 Err(_) => {
4252 let kp = identity::MachineKeypair::generate()?;
4254 storage::save_machine_keypair_to(&kp, &path).await?;
4255 kp
4256 }
4257 }
4258 } else if storage::machine_keypair_exists().await {
4259 storage::load_machine_keypair().await?
4261 } else {
4262 let kp = identity::MachineKeypair::generate()?;
4264 storage::save_machine_keypair(&kp).await?;
4265 kp
4266 };
4267
4268 let agent_keypair = if let Some(kp) = self.agent_keypair {
4270 kp
4272 } else if let Some(path) = self.agent_key_path {
4273 match storage::load_agent_keypair_from(&path).await {
4275 Ok(kp) => kp,
4276 Err(_) => {
4277 let kp = identity::AgentKeypair::generate()?;
4278 storage::save_agent_keypair_to(&kp, &path).await?;
4279 kp
4280 }
4281 }
4282 } else if storage::agent_keypair_exists().await {
4283 storage::load_agent_keypair_default().await?
4285 } else {
4286 let kp = identity::AgentKeypair::generate()?;
4288 storage::save_agent_keypair_default(&kp).await?;
4289 kp
4290 };
4291
4292 let user_keypair = if let Some(kp) = self.user_keypair {
4294 Some(kp)
4295 } else if let Some(path) = self.user_key_path {
4296 storage::load_user_keypair_from(&path).await.ok()
4298 } else if storage::user_keypair_exists().await {
4299 storage::load_user_keypair().await.ok()
4301 } else {
4302 None
4303 };
4304
4305 let identity = if let Some(user_kp) = user_keypair {
4323 let cert_path = self.agent_cert_path.clone();
4324 let existing_cert = if let Some(ref p) = cert_path {
4325 if tokio::fs::try_exists(p).await.unwrap_or(false) {
4326 storage::load_agent_certificate_from(p).await.ok()
4327 } else {
4328 None
4329 }
4330 } else if storage::agent_certificate_exists().await {
4331 storage::load_agent_certificate().await.ok()
4332 } else {
4333 None
4334 };
4335
4336 let cert_still_valid = existing_cert.as_ref().is_some_and(|c| {
4337 let user_match = c
4338 .user_id()
4339 .map(|uid| uid == user_kp.user_id())
4340 .unwrap_or(false);
4341 let agent_match = c
4342 .agent_id()
4343 .map(|aid| aid == agent_keypair.agent_id())
4344 .unwrap_or(false);
4345 user_match && agent_match
4346 });
4347
4348 let cert = if cert_still_valid {
4349 existing_cert.expect("cert_still_valid implies Some")
4350 } else {
4351 let new_cert = identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
4352 if let Some(ref p) = cert_path {
4353 storage::save_agent_certificate_to(&new_cert, p).await?;
4354 } else {
4355 storage::save_agent_certificate(&new_cert).await?;
4356 }
4357 new_cert
4358 };
4359 identity::Identity::new_with_user(machine_keypair, agent_keypair, user_kp, cert)
4360 } else {
4361 identity::Identity::new(machine_keypair, agent_keypair)
4362 };
4363
4364 let bootstrap_cache = if self.network_config.is_some() && !self.disable_peer_cache {
4367 let cache_dir = self.peer_cache_dir.unwrap_or_else(|| {
4368 dirs::home_dir()
4369 .unwrap_or_else(|| std::path::PathBuf::from("."))
4370 .join(".x0x")
4371 .join("peers")
4372 });
4373 let config = ant_quic::BootstrapCacheConfig::builder()
4374 .cache_dir(cache_dir)
4375 .min_peers_to_save(1)
4376 .build();
4377 match ant_quic::BootstrapCache::open(config).await {
4378 Ok(cache) => {
4379 let cache = std::sync::Arc::new(cache);
4380 std::sync::Arc::clone(&cache).start_maintenance();
4381 Some(cache)
4382 }
4383 Err(e) => {
4384 tracing::warn!("Failed to open bootstrap cache: {e}");
4385 None
4386 }
4387 }
4388 } else {
4389 None
4390 };
4391
4392 let machine_keypair = {
4395 let pk = ant_quic::MlDsaPublicKey::from_bytes(
4396 identity.machine_keypair().public_key().as_bytes(),
4397 )
4398 .map_err(|e| {
4399 error::IdentityError::Storage(std::io::Error::other(format!(
4400 "invalid machine public key: {e}"
4401 )))
4402 })?;
4403 let sk = ant_quic::MlDsaSecretKey::from_bytes(
4404 identity.machine_keypair().secret_key().as_bytes(),
4405 )
4406 .map_err(|e| {
4407 error::IdentityError::Storage(std::io::Error::other(format!(
4408 "invalid machine secret key: {e}"
4409 )))
4410 })?;
4411 Some((pk, sk))
4412 };
4413
4414 let network = if let Some(config) = self.network_config {
4415 let node = network::NetworkNode::new(config, bootstrap_cache.clone(), machine_keypair)
4416 .await
4417 .map_err(|e| {
4418 error::IdentityError::Storage(std::io::Error::other(format!(
4419 "network initialization failed: {}",
4420 e
4421 )))
4422 })?;
4423
4424 debug_assert_eq!(
4426 node.peer_id().0,
4427 identity.machine_id().0,
4428 "ant-quic PeerId must equal MachineId after identity unification"
4429 );
4430
4431 Some(std::sync::Arc::new(node))
4432 } else {
4433 None
4434 };
4435
4436 let signing_ctx = std::sync::Arc::new(gossip::SigningContext::from_keypair(
4438 identity.agent_keypair(),
4439 ));
4440
4441 let gossip_runtime = if let Some(ref net) = network {
4443 let runtime = gossip::GossipRuntime::new(
4444 gossip::GossipConfig::default(),
4445 std::sync::Arc::clone(net),
4446 Some(signing_ctx),
4447 )
4448 .await
4449 .map_err(|e| {
4450 error::IdentityError::Storage(std::io::Error::other(format!(
4451 "gossip runtime initialization failed: {}",
4452 e
4453 )))
4454 })?;
4455 Some(std::sync::Arc::new(runtime))
4456 } else {
4457 None
4458 };
4459
4460 let contacts_path = self.contact_store_path.unwrap_or_else(|| {
4462 dirs::home_dir()
4463 .unwrap_or_else(|| std::path::PathBuf::from("."))
4464 .join(".x0x")
4465 .join("contacts.json")
4466 });
4467 let contact_store = std::sync::Arc::new(tokio::sync::RwLock::new(
4468 contacts::ContactStore::new(contacts_path),
4469 ));
4470
4471 let gossip_cache_adapter = bootstrap_cache.as_ref().map(|cache| {
4473 saorsa_gossip_coordinator::GossipCacheAdapter::new(std::sync::Arc::clone(cache))
4474 });
4475
4476 let direct_messaging = std::sync::Arc::new(direct::DirectMessaging::new());
4478
4479 let presence = if let Some(ref net) = network {
4481 let peer_id = saorsa_gossip_transport::GossipTransport::local_peer_id(net.as_ref());
4482 let mut presence_config = presence::PresenceConfig::default();
4483 if let Some(secs) = self.presence_beacon_interval_secs {
4484 presence_config.beacon_interval_secs = secs;
4485 }
4486 if let Some(secs) = self.presence_event_poll_interval_secs {
4487 presence_config.event_poll_interval_secs = secs;
4488 }
4489 if let Some(secs) = self.presence_offline_timeout_secs {
4490 presence_config.adaptive_timeout_fallback_secs = secs;
4491 }
4492 let pw = presence::PresenceWrapper::new(
4493 peer_id,
4494 std::sync::Arc::clone(net),
4495 presence_config,
4496 bootstrap_cache.clone(),
4497 )
4498 .map_err(|e| {
4499 error::IdentityError::Storage(std::io::Error::other(format!(
4500 "presence initialization failed: {}",
4501 e
4502 )))
4503 })?;
4504 let pw_arc = std::sync::Arc::new(pw);
4505 if let Some(ref rt) = gossip_runtime {
4507 rt.set_presence(std::sync::Arc::clone(&pw_arc));
4508 }
4509 Some(pw_arc)
4510 } else {
4511 None
4512 };
4513
4514 Ok(Agent {
4515 identity: std::sync::Arc::new(identity),
4516 network,
4517 gossip_runtime,
4518 bootstrap_cache,
4519 gossip_cache_adapter,
4520 identity_discovery_cache: std::sync::Arc::new(tokio::sync::RwLock::new(
4521 std::collections::HashMap::new(),
4522 )),
4523 identity_listener_started: std::sync::atomic::AtomicBool::new(false),
4524 heartbeat_interval_secs: self
4525 .heartbeat_interval_secs
4526 .unwrap_or(IDENTITY_HEARTBEAT_INTERVAL_SECS),
4527 identity_ttl_secs: self.identity_ttl_secs.unwrap_or(IDENTITY_TTL_SECS),
4528 heartbeat_handle: tokio::sync::Mutex::new(None),
4529 rendezvous_advertised: std::sync::atomic::AtomicBool::new(false),
4530 contact_store,
4531 direct_messaging,
4532 network_event_listener_started: std::sync::atomic::AtomicBool::new(false),
4533 direct_listener_started: std::sync::atomic::AtomicBool::new(false),
4534 presence,
4535 user_identity_consented: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
4536 capability_store: std::sync::Arc::new(dm_capability::CapabilityStore::new()),
4537 dm_capabilities_tx: std::sync::Arc::new({
4538 let (tx, _rx) = tokio::sync::watch::channel(dm::DmCapabilities::pending());
4539 tx
4540 }),
4541 dm_inflight_acks: std::sync::Arc::new(dm::InFlightAcks::new()),
4542 recent_delivery_cache: std::sync::Arc::new(dm::RecentDeliveryCache::with_defaults()),
4543 capability_advert_service: tokio::sync::Mutex::new(None),
4544 dm_inbox_service: tokio::sync::Mutex::new(None),
4545 })
4546 }
4547}
4548
4549#[derive(Clone)]
4563pub struct TaskListHandle {
4564 sync: std::sync::Arc<crdt::TaskListSync>,
4565 agent_id: identity::AgentId,
4566 peer_id: saorsa_gossip_types::PeerId,
4567}
4568
4569impl std::fmt::Debug for TaskListHandle {
4570 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4571 f.debug_struct("TaskListHandle")
4572 .field("agent_id", &self.agent_id)
4573 .field("peer_id", &self.peer_id)
4574 .finish_non_exhaustive()
4575 }
4576}
4577
4578impl TaskListHandle {
4579 pub async fn add_task(
4594 &self,
4595 title: String,
4596 description: String,
4597 ) -> error::Result<crdt::TaskId> {
4598 let (task_id, delta) = {
4599 let mut list = self.sync.write().await;
4600 let seq = list.next_seq();
4601 let task_id = crdt::TaskId::new(&title, &self.agent_id, seq);
4602 let metadata = crdt::TaskMetadata::new(title, description, 128, self.agent_id, seq);
4603 let task = crdt::TaskItem::new(task_id, metadata, self.peer_id);
4604 list.add_task(task.clone(), self.peer_id, seq)
4605 .map_err(|e| {
4606 error::IdentityError::Storage(std::io::Error::other(format!(
4607 "add_task failed: {}",
4608 e
4609 )))
4610 })?;
4611 let tag = (self.peer_id, seq);
4612 let delta = crdt::TaskListDelta::for_add(task_id, task, tag, list.current_version());
4613 (task_id, delta)
4614 };
4615 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
4617 tracing::warn!("failed to publish add_task delta: {}", e);
4618 }
4619 Ok(task_id)
4620 }
4621
4622 pub async fn claim_task(&self, task_id: crdt::TaskId) -> error::Result<()> {
4632 let delta = {
4633 let mut list = self.sync.write().await;
4634 let seq = list.next_seq();
4635 list.claim_task(&task_id, self.agent_id, self.peer_id, seq)
4636 .map_err(|e| {
4637 error::IdentityError::Storage(std::io::Error::other(format!(
4638 "claim_task failed: {}",
4639 e
4640 )))
4641 })?;
4642 let full_task = list
4644 .get_task(&task_id)
4645 .ok_or_else(|| {
4646 error::IdentityError::Storage(std::io::Error::other(
4647 "task disappeared after claim",
4648 ))
4649 })?
4650 .clone();
4651 crdt::TaskListDelta::for_state_change(task_id, full_task, list.current_version())
4652 };
4653 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
4654 tracing::warn!("failed to publish claim_task delta: {}", e);
4655 }
4656 Ok(())
4657 }
4658
4659 pub async fn complete_task(&self, task_id: crdt::TaskId) -> error::Result<()> {
4669 let delta = {
4670 let mut list = self.sync.write().await;
4671 let seq = list.next_seq();
4672 list.complete_task(&task_id, self.agent_id, self.peer_id, seq)
4673 .map_err(|e| {
4674 error::IdentityError::Storage(std::io::Error::other(format!(
4675 "complete_task failed: {}",
4676 e
4677 )))
4678 })?;
4679 let full_task = list
4680 .get_task(&task_id)
4681 .ok_or_else(|| {
4682 error::IdentityError::Storage(std::io::Error::other(
4683 "task disappeared after complete",
4684 ))
4685 })?
4686 .clone();
4687 crdt::TaskListDelta::for_state_change(task_id, full_task, list.current_version())
4688 };
4689 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
4690 tracing::warn!("failed to publish complete_task delta: {}", e);
4691 }
4692 Ok(())
4693 }
4694
4695 pub async fn list_tasks(&self) -> error::Result<Vec<TaskSnapshot>> {
4705 let list = self.sync.read().await;
4706 let tasks = list.tasks_ordered();
4707 Ok(tasks
4708 .into_iter()
4709 .map(|task| TaskSnapshot {
4710 id: *task.id(),
4711 title: task.title().to_string(),
4712 description: task.description().to_string(),
4713 state: task.current_state(),
4714 assignee: task.assignee().copied(),
4715 owner: None,
4716 priority: task.priority(),
4717 })
4718 .collect())
4719 }
4720
4721 pub async fn reorder(&self, task_ids: Vec<crdt::TaskId>) -> error::Result<()> {
4731 let delta = {
4732 let mut list = self.sync.write().await;
4733 list.reorder(task_ids.clone(), self.peer_id).map_err(|e| {
4734 error::IdentityError::Storage(std::io::Error::other(format!(
4735 "reorder failed: {}",
4736 e
4737 )))
4738 })?;
4739 crdt::TaskListDelta::for_reorder(task_ids, list.current_version())
4740 };
4741 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
4742 tracing::warn!("failed to publish reorder delta: {}", e);
4743 }
4744 Ok(())
4745 }
4746}
4747
4748impl Agent {
4753 pub async fn create_kv_store(&self, name: &str, topic: &str) -> error::Result<KvStoreHandle> {
4762 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
4763 error::IdentityError::Storage(std::io::Error::other(
4764 "gossip runtime not initialized - configure agent with network first",
4765 ))
4766 })?;
4767
4768 let peer_id = runtime.peer_id();
4769 let store_id = kv::KvStoreId::from_content(name, &self.agent_id());
4770 let store = kv::KvStore::new(
4771 store_id,
4772 name.to_string(),
4773 self.agent_id(),
4774 kv::AccessPolicy::Signed,
4775 );
4776
4777 let sync = kv::KvStoreSync::new(
4778 store,
4779 std::sync::Arc::clone(runtime.pubsub()),
4780 topic.to_string(),
4781 30,
4782 )
4783 .map_err(|e| {
4784 error::IdentityError::Storage(std::io::Error::other(format!(
4785 "kv store sync creation failed: {e}",
4786 )))
4787 })?;
4788
4789 let sync = std::sync::Arc::new(sync);
4790 sync.start().await.map_err(|e| {
4791 error::IdentityError::Storage(std::io::Error::other(format!(
4792 "kv store sync start failed: {e}",
4793 )))
4794 })?;
4795
4796 Ok(KvStoreHandle {
4797 sync,
4798 agent_id: self.agent_id(),
4799 peer_id,
4800 })
4801 }
4802
4803 pub async fn join_kv_store(&self, topic: &str) -> error::Result<KvStoreHandle> {
4813 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
4814 error::IdentityError::Storage(std::io::Error::other(
4815 "gossip runtime not initialized - configure agent with network first",
4816 ))
4817 })?;
4818
4819 let peer_id = runtime.peer_id();
4820 let store_id = kv::KvStoreId::from_content(topic, &self.agent_id());
4821 let store = kv::KvStore::new(
4824 store_id,
4825 String::new(),
4826 self.agent_id(),
4827 kv::AccessPolicy::Encrypted {
4828 group_id: Vec::new(),
4829 },
4830 );
4831
4832 let sync = kv::KvStoreSync::new(
4833 store,
4834 std::sync::Arc::clone(runtime.pubsub()),
4835 topic.to_string(),
4836 30,
4837 )
4838 .map_err(|e| {
4839 error::IdentityError::Storage(std::io::Error::other(format!(
4840 "kv store sync creation failed: {e}",
4841 )))
4842 })?;
4843
4844 let sync = std::sync::Arc::new(sync);
4845 sync.start().await.map_err(|e| {
4846 error::IdentityError::Storage(std::io::Error::other(format!(
4847 "kv store sync start failed: {e}",
4848 )))
4849 })?;
4850
4851 Ok(KvStoreHandle {
4852 sync,
4853 agent_id: self.agent_id(),
4854 peer_id,
4855 })
4856 }
4857}
4858
4859#[derive(Clone)]
4864pub struct KvStoreHandle {
4865 sync: std::sync::Arc<kv::KvStoreSync>,
4866 agent_id: identity::AgentId,
4867 peer_id: saorsa_gossip_types::PeerId,
4868}
4869
4870impl std::fmt::Debug for KvStoreHandle {
4871 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4872 f.debug_struct("KvStoreHandle")
4873 .field("agent_id", &self.agent_id)
4874 .field("peer_id", &self.peer_id)
4875 .finish_non_exhaustive()
4876 }
4877}
4878
4879impl KvStoreHandle {
4880 pub async fn put(
4889 &self,
4890 key: String,
4891 value: Vec<u8>,
4892 content_type: String,
4893 ) -> error::Result<()> {
4894 let delta = {
4895 let mut store = self.sync.write().await;
4896 store
4897 .put(
4898 key.clone(),
4899 value.clone(),
4900 content_type.clone(),
4901 self.peer_id,
4902 )
4903 .map_err(|e| {
4904 error::IdentityError::Storage(std::io::Error::other(format!(
4905 "kv put failed: {e}",
4906 )))
4907 })?;
4908 let entry = store.get(&key).cloned();
4909 let version = store.current_version();
4910 match entry {
4911 Some(e) => {
4912 kv::KvStoreDelta::for_put(key, e, (self.peer_id, store.next_seq()), version)
4913 }
4914 None => return Ok(()), }
4916 };
4917 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
4918 tracing::warn!("failed to publish kv put delta: {e}");
4919 }
4920 Ok(())
4921 }
4922
4923 pub async fn get(&self, key: &str) -> error::Result<Option<KvEntrySnapshot>> {
4931 let store = self.sync.read().await;
4932 Ok(store.get(key).map(|e| KvEntrySnapshot {
4933 key: e.key.clone(),
4934 value: e.value.clone(),
4935 content_hash: hex::encode(e.content_hash),
4936 content_type: e.content_type.clone(),
4937 metadata: e.metadata.clone(),
4938 created_at: e.created_at,
4939 updated_at: e.updated_at,
4940 }))
4941 }
4942
4943 pub async fn remove(&self, key: &str) -> error::Result<()> {
4949 let delta = {
4950 let mut store = self.sync.write().await;
4951 store.remove(key).map_err(|e| {
4952 error::IdentityError::Storage(std::io::Error::other(format!(
4953 "kv remove failed: {e}",
4954 )))
4955 })?;
4956 let mut d = kv::KvStoreDelta::new(store.current_version());
4957 d.removed
4958 .insert(key.to_string(), std::collections::HashSet::new());
4959 d
4960 };
4961 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
4962 tracing::warn!("failed to publish kv remove delta: {e}");
4963 }
4964 Ok(())
4965 }
4966
4967 pub async fn keys(&self) -> error::Result<Vec<KvEntrySnapshot>> {
4973 let store = self.sync.read().await;
4974 Ok(store
4975 .active_entries()
4976 .into_iter()
4977 .map(|e| KvEntrySnapshot {
4978 key: e.key.clone(),
4979 value: e.value.clone(),
4980 content_hash: hex::encode(e.content_hash),
4981 content_type: e.content_type.clone(),
4982 metadata: e.metadata.clone(),
4983 created_at: e.created_at,
4984 updated_at: e.updated_at,
4985 })
4986 .collect())
4987 }
4988
4989 pub async fn name(&self) -> error::Result<String> {
4995 let store = self.sync.read().await;
4996 Ok(store.name().to_string())
4997 }
4998}
4999
5000#[derive(Debug, Clone, serde::Serialize)]
5002pub struct KvEntrySnapshot {
5003 pub key: String,
5005 pub value: Vec<u8>,
5007 pub content_hash: String,
5009 pub content_type: String,
5011 pub metadata: std::collections::HashMap<String, String>,
5013 pub created_at: u64,
5015 pub updated_at: u64,
5017}
5018
5019#[derive(Debug, Clone)]
5024pub struct TaskSnapshot {
5025 pub id: crdt::TaskId,
5027 pub title: String,
5029 pub description: String,
5031 pub state: crdt::CheckboxState,
5033 pub assignee: Option<identity::AgentId>,
5035 pub owner: Option<identity::UserId>,
5037 pub priority: u8,
5039}
5040
5041pub const VERSION: &str = env!("CARGO_PKG_VERSION");
5043
5044pub const NAME: &str = "x0x";
5046
5047#[cfg(test)]
5048mod tests {
5049 use super::*;
5050
5051 fn sa(s: &str) -> std::net::SocketAddr {
5052 s.parse().expect("valid SocketAddr literal in test")
5053 }
5054
5055 #[test]
5056 fn is_publicly_advertisable_rejects_lan_and_special_scopes() {
5057 assert!(
5059 !is_publicly_advertisable(sa("127.0.0.1:5483")),
5060 "loopback v4"
5061 );
5062 assert!(!is_publicly_advertisable(sa("10.1.2.3:5483")), "rfc1918 /8");
5063 assert!(
5064 !is_publicly_advertisable(sa("172.20.0.5:5483")),
5065 "rfc1918 /12"
5066 );
5067 assert!(
5068 !is_publicly_advertisable(sa("192.168.1.5:5483")),
5069 "rfc1918 /16"
5070 );
5071 assert!(
5072 !is_publicly_advertisable(sa("169.254.1.1:5483")),
5073 "link-local v4"
5074 );
5075 assert!(
5076 !is_publicly_advertisable(sa("100.64.1.1:5483")),
5077 "CGNAT (unreachable outside carrier)"
5078 );
5079 assert!(
5080 !is_publicly_advertisable(sa("0.0.0.0:5483")),
5081 "unspecified v4"
5082 );
5083
5084 assert!(!is_publicly_advertisable(sa("[::1]:5483")), "loopback v6");
5086 assert!(
5087 !is_publicly_advertisable(sa("[fe80::1]:5483")),
5088 "link-local v6"
5089 );
5090 assert!(!is_publicly_advertisable(sa("[fd00::1]:5483")), "ULA v6");
5091
5092 assert!(
5094 !is_publicly_advertisable(sa("1.2.3.4:0")),
5095 "port 0 on global v4"
5096 );
5097
5098 assert!(is_publicly_advertisable(sa("1.2.3.4:5483")), "global v4");
5100 assert!(
5101 is_publicly_advertisable(sa("[2001:db8::1]:5483")),
5102 "global v6 (documentation doc but is_globally_routable permits)",
5103 );
5104 assert!(
5105 is_publicly_advertisable(sa("8.8.8.8:9000")),
5106 "global v4 on non-default port",
5107 );
5108
5109 assert!(
5112 !is_publicly_advertisable(sa("192.0.2.1:5483")),
5113 "TEST-NET-1 documentation range"
5114 );
5115 assert!(
5116 !is_publicly_advertisable(sa("203.0.113.10:5483")),
5117 "TEST-NET-3 documentation range"
5118 );
5119 }
5120
5121 #[test]
5122 fn presence_parse_addr_hints_drops_private_scopes() {
5123 let hints = vec![
5127 "127.0.0.1:5483".to_string(),
5128 "10.200.0.1:5483".to_string(),
5129 "[fd00::1]:5483".to_string(),
5130 "1.2.3.4:5483".to_string(),
5131 "[2001:db8::1]:5483".to_string(),
5132 "not-an-address".to_string(),
5133 ];
5134 let parsed = presence::parse_addr_hints(&hints);
5135 let got: Vec<String> = parsed.iter().map(|a| a.to_string()).collect();
5136 assert_eq!(
5137 got,
5138 vec!["1.2.3.4:5483".to_string(), "[2001:db8::1]:5483".to_string()],
5139 "only globally-advertisable addresses survive inbound parsing"
5140 );
5141 }
5142
5143 #[test]
5144 fn name_is_palindrome() {
5145 let name = NAME;
5146 let reversed: String = name.chars().rev().collect();
5147 assert_eq!(name, reversed, "x0x must be a palindrome");
5148 }
5149
5150 #[test]
5151 fn name_is_three_bytes() {
5152 assert_eq!(NAME.len(), 3, "x0x must be exactly three bytes");
5153 }
5154
5155 #[test]
5156 fn name_is_ai_native() {
5157 assert!(NAME.chars().all(|c| c.is_ascii_alphanumeric()));
5160 }
5161
5162 #[tokio::test]
5163 async fn agent_creates() {
5164 let agent = Agent::new().await;
5165 assert!(agent.is_ok());
5166 }
5167
5168 #[tokio::test]
5169 async fn agent_joins_network() {
5170 let agent = Agent::new().await.unwrap();
5171 assert!(agent.join_network().await.is_ok());
5172 }
5173
5174 #[tokio::test]
5175 async fn agent_subscribes() {
5176 let agent = Agent::new().await.unwrap();
5177 assert!(agent.subscribe("test-topic").await.is_err());
5179 }
5180
5181 #[tokio::test]
5182 async fn identity_announcement_machine_signature_verifies() {
5183 let agent = Agent::builder()
5184 .with_network_config(network::NetworkConfig::default())
5185 .build()
5186 .await
5187 .unwrap();
5188
5189 let announcement = agent.build_identity_announcement(false, false).unwrap();
5190 assert_eq!(announcement.agent_id, agent.agent_id());
5191 assert_eq!(announcement.machine_id, agent.machine_id());
5192 assert!(announcement.user_id.is_none());
5193 assert!(announcement.agent_certificate.is_none());
5194 assert!(announcement.verify().is_ok());
5195 }
5196
5197 #[tokio::test]
5198 async fn identity_announcement_requires_human_consent() {
5199 let agent = Agent::builder()
5200 .with_network_config(network::NetworkConfig::default())
5201 .build()
5202 .await
5203 .unwrap();
5204
5205 let err = agent.build_identity_announcement(true, false).unwrap_err();
5206 assert!(
5207 err.to_string().contains("explicit human consent"),
5208 "unexpected error: {err}"
5209 );
5210 }
5211
5212 #[tokio::test]
5213 async fn identity_announcement_with_user_requires_user_identity() {
5214 let agent = Agent::builder()
5215 .with_network_config(network::NetworkConfig::default())
5216 .build()
5217 .await
5218 .unwrap();
5219
5220 let err = agent.build_identity_announcement(true, true).unwrap_err();
5221 assert!(
5222 err.to_string().contains("no user identity is configured"),
5223 "unexpected error: {err}"
5224 );
5225 }
5226
5227 #[tokio::test]
5228 async fn announce_identity_populates_discovery_cache() {
5229 let user_key = identity::UserKeypair::generate().unwrap();
5230 let agent = Agent::builder()
5231 .with_network_config(network::NetworkConfig::default())
5232 .with_user_key(user_key)
5233 .build()
5234 .await
5235 .unwrap();
5236
5237 agent.announce_identity(true, true).await.unwrap();
5238 let discovered = agent.discovered_agent(agent.agent_id()).await.unwrap();
5239 let entry = discovered.expect("agent should discover its own announcement");
5240
5241 assert_eq!(entry.agent_id, agent.agent_id());
5242 assert_eq!(entry.machine_id, agent.machine_id());
5243 assert_eq!(entry.user_id, agent.user_id());
5244 }
5245
5246 #[test]
5250 fn identity_announcement_backward_compat_no_nat_fields() {
5251 use identity::{AgentId, MachineId};
5252
5253 #[derive(serde::Serialize, serde::Deserialize)]
5256 struct OldIdentityAnnouncementUnsigned {
5257 agent_id: AgentId,
5258 machine_id: MachineId,
5259 user_id: Option<identity::UserId>,
5260 agent_certificate: Option<identity::AgentCertificate>,
5261 machine_public_key: Vec<u8>,
5262 addresses: Vec<std::net::SocketAddr>,
5263 announced_at: u64,
5264 }
5265
5266 let agent_id = AgentId([1u8; 32]);
5267 let machine_id = MachineId([2u8; 32]);
5268 let old = OldIdentityAnnouncementUnsigned {
5269 agent_id,
5270 machine_id,
5271 user_id: None,
5272 agent_certificate: None,
5273 machine_public_key: vec![0u8; 10],
5274 addresses: Vec::new(),
5275 announced_at: 1234,
5276 };
5277 let bytes = bincode::serialize(&old).expect("serialize old announcement");
5278
5279 let result = bincode::deserialize::<IdentityAnnouncementUnsigned>(&bytes);
5287 assert!(
5290 result.is_err(),
5291 "Old-format announcement should not decode as new struct (protocol upgrade required)"
5292 );
5293 }
5294
5295 #[test]
5297 fn identity_announcement_nat_fields_round_trip() {
5298 use identity::{AgentId, MachineId};
5299
5300 let unsigned = IdentityAnnouncementUnsigned {
5301 agent_id: AgentId([1u8; 32]),
5302 machine_id: MachineId([2u8; 32]),
5303 user_id: None,
5304 agent_certificate: None,
5305 machine_public_key: vec![0u8; 10],
5306 addresses: Vec::new(),
5307 announced_at: 9999,
5308 nat_type: Some("FullCone".to_string()),
5309 can_receive_direct: Some(true),
5310 is_relay: Some(false),
5311 is_coordinator: Some(true),
5312 };
5313 let bytes = bincode::serialize(&unsigned).expect("serialize");
5314 let decoded: IdentityAnnouncementUnsigned =
5315 bincode::deserialize(&bytes).expect("deserialize");
5316 assert_eq!(decoded.nat_type.as_deref(), Some("FullCone"));
5317 assert_eq!(decoded.can_receive_direct, Some(true));
5318 assert_eq!(decoded.is_relay, Some(false));
5319 assert_eq!(decoded.is_coordinator, Some(true));
5320 }
5321
5322 #[test]
5325 fn identity_announcement_no_nat_fields_round_trip() {
5326 use identity::{AgentId, MachineId};
5327
5328 let unsigned = IdentityAnnouncementUnsigned {
5329 agent_id: AgentId([3u8; 32]),
5330 machine_id: MachineId([4u8; 32]),
5331 user_id: None,
5332 agent_certificate: None,
5333 machine_public_key: vec![0u8; 10],
5334 addresses: Vec::new(),
5335 announced_at: 42,
5336 nat_type: None,
5337 can_receive_direct: None,
5338 is_relay: None,
5339 is_coordinator: None,
5340 };
5341 let bytes = bincode::serialize(&unsigned).expect("serialize");
5342 let decoded: IdentityAnnouncementUnsigned =
5343 bincode::deserialize(&bytes).expect("deserialize");
5344 assert!(decoded.nat_type.is_none());
5345 assert!(decoded.can_receive_direct.is_none());
5346 assert!(decoded.is_relay.is_none());
5347 assert!(decoded.is_coordinator.is_none());
5348 }
5349}