1#![allow(clippy::unwrap_used)]
2#![allow(clippy::expect_used)]
3#![allow(missing_docs)]
4
5pub mod error;
55
56pub mod identity;
62
63pub mod storage;
68
69pub mod bootstrap;
74pub mod network;
76
77pub mod contacts;
79
80pub mod trust;
85
86pub mod connectivity;
91
92pub mod gossip;
94
95pub mod crdt;
97
98pub mod kv;
100
101pub mod groups;
103
104pub mod mls;
106
107pub mod direct;
112
113pub mod presence;
115
116pub mod upgrade;
118
119pub mod files;
121
122pub mod constitution;
124
125pub mod api;
127
128pub mod cli;
130
131pub use gossip::{
133 GossipConfig, GossipRuntime, PubSubManager, PubSubMessage, SigningContext, Subscription,
134};
135
136pub use direct::{DirectMessage, DirectMessageReceiver, DirectMessaging};
138
139use saorsa_gossip_membership::Membership as _;
141
142pub struct Agent {
164 identity: std::sync::Arc<identity::Identity>,
165 #[allow(dead_code)]
167 network: Option<std::sync::Arc<network::NetworkNode>>,
168 gossip_runtime: Option<std::sync::Arc<gossip::GossipRuntime>>,
170 bootstrap_cache: Option<std::sync::Arc<ant_quic::BootstrapCache>>,
172 gossip_cache_adapter: Option<saorsa_gossip_coordinator::GossipCacheAdapter>,
174 identity_discovery_cache: std::sync::Arc<
176 tokio::sync::RwLock<std::collections::HashMap<identity::AgentId, DiscoveredAgent>>,
177 >,
178 identity_listener_started: std::sync::atomic::AtomicBool,
180 heartbeat_interval_secs: u64,
182 identity_ttl_secs: u64,
184 heartbeat_handle: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
186 rendezvous_advertised: std::sync::atomic::AtomicBool,
188 contact_store: std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>>,
190 direct_messaging: std::sync::Arc<direct::DirectMessaging>,
192 direct_listener_started: std::sync::atomic::AtomicBool,
194 presence: Option<std::sync::Arc<presence::PresenceWrapper>>,
196}
197
198impl std::fmt::Debug for Agent {
199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 f.debug_struct("Agent")
201 .field("identity", &self.identity)
202 .field("network", &self.network.is_some())
203 .field("gossip_runtime", &self.gossip_runtime.is_some())
204 .field("bootstrap_cache", &self.bootstrap_cache.is_some())
205 .field("gossip_cache_adapter", &self.gossip_cache_adapter.is_some())
206 .finish()
207 }
208}
209
210#[derive(Debug, Clone)]
212pub struct Message {
213 pub origin: String,
215 pub payload: Vec<u8>,
217 pub topic: String,
219}
220
221pub const IDENTITY_ANNOUNCE_TOPIC: &str = "x0x.identity.announce.v1";
223
224#[must_use]
234pub fn shard_topic_for_agent(agent_id: &identity::AgentId) -> String {
235 let shard = saorsa_gossip_rendezvous::calculate_shard(&agent_id.0);
236 format!("x0x.identity.shard.{shard}")
237}
238
239pub const RENDEZVOUS_SHARD_TOPIC_PREFIX: &str = "x0x.rendezvous.shard";
241
242#[must_use]
248pub fn rendezvous_shard_topic_for_agent(agent_id: &identity::AgentId) -> String {
249 let shard = saorsa_gossip_rendezvous::calculate_shard(&agent_id.0);
250 format!("{RENDEZVOUS_SHARD_TOPIC_PREFIX}.{shard}")
251}
252
253pub const IDENTITY_HEARTBEAT_INTERVAL_SECS: u64 = 300;
255
256pub const IDENTITY_TTL_SECS: u64 = 900;
261
262#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
263struct IdentityAnnouncementUnsigned {
264 agent_id: identity::AgentId,
265 machine_id: identity::MachineId,
266 user_id: Option<identity::UserId>,
267 agent_certificate: Option<identity::AgentCertificate>,
268 machine_public_key: Vec<u8>,
269 addresses: Vec<std::net::SocketAddr>,
270 announced_at: u64,
271 nat_type: Option<String>,
273 can_receive_direct: Option<bool>,
275 is_relay: Option<bool>,
277 is_coordinator: Option<bool>,
279}
280
281#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
286pub struct IdentityAnnouncement {
287 pub agent_id: identity::AgentId,
289 pub machine_id: identity::MachineId,
291 pub user_id: Option<identity::UserId>,
293 pub agent_certificate: Option<identity::AgentCertificate>,
295 pub machine_public_key: Vec<u8>,
297 pub machine_signature: Vec<u8>,
299 pub addresses: Vec<std::net::SocketAddr>,
301 pub announced_at: u64,
303 pub nat_type: Option<String>,
306 pub can_receive_direct: Option<bool>,
309 pub is_relay: Option<bool>,
312 pub is_coordinator: Option<bool>,
315}
316
317impl IdentityAnnouncement {
318 fn to_unsigned(&self) -> IdentityAnnouncementUnsigned {
319 IdentityAnnouncementUnsigned {
320 agent_id: self.agent_id,
321 machine_id: self.machine_id,
322 user_id: self.user_id,
323 agent_certificate: self.agent_certificate.clone(),
324 machine_public_key: self.machine_public_key.clone(),
325 addresses: self.addresses.clone(),
326 announced_at: self.announced_at,
327 nat_type: self.nat_type.clone(),
328 can_receive_direct: self.can_receive_direct,
329 is_relay: self.is_relay,
330 is_coordinator: self.is_coordinator,
331 }
332 }
333
334 pub fn verify(&self) -> error::Result<()> {
336 let machine_pub =
337 ant_quic::MlDsaPublicKey::from_bytes(&self.machine_public_key).map_err(|_| {
338 error::IdentityError::CertificateVerification(
339 "invalid machine public key in announcement".to_string(),
340 )
341 })?;
342 let derived_machine_id = identity::MachineId::from_public_key(&machine_pub);
343 if derived_machine_id != self.machine_id {
344 return Err(error::IdentityError::CertificateVerification(
345 "machine_id does not match machine public key".to_string(),
346 ));
347 }
348
349 let unsigned_bytes = bincode::serialize(&self.to_unsigned()).map_err(|e| {
350 error::IdentityError::Serialization(format!(
351 "failed to serialize announcement for verification: {e}"
352 ))
353 })?;
354 let signature = ant_quic::crypto::raw_public_keys::pqc::MlDsaSignature::from_bytes(
355 &self.machine_signature,
356 )
357 .map_err(|e| {
358 error::IdentityError::CertificateVerification(format!(
359 "invalid machine signature in announcement: {:?}",
360 e
361 ))
362 })?;
363 ant_quic::crypto::raw_public_keys::pqc::verify_with_ml_dsa(
364 &machine_pub,
365 &unsigned_bytes,
366 &signature,
367 )
368 .map_err(|e| {
369 error::IdentityError::CertificateVerification(format!(
370 "machine signature verification failed: {:?}",
371 e
372 ))
373 })?;
374
375 match (self.user_id, self.agent_certificate.as_ref()) {
376 (Some(user_id), Some(cert)) => {
377 cert.verify()?;
378 let cert_agent_id = cert.agent_id()?;
379 if cert_agent_id != self.agent_id {
380 return Err(error::IdentityError::CertificateVerification(
381 "agent certificate agent_id mismatch".to_string(),
382 ));
383 }
384 let cert_user_id = cert.user_id()?;
385 if cert_user_id != user_id {
386 return Err(error::IdentityError::CertificateVerification(
387 "agent certificate user_id mismatch".to_string(),
388 ));
389 }
390 Ok(())
391 }
392 (None, None) => Ok(()),
393 _ => Err(error::IdentityError::CertificateVerification(
394 "user identity disclosure requires matching certificate".to_string(),
395 )),
396 }
397 }
398}
399
400#[derive(Debug, Clone)]
402pub struct DiscoveredAgent {
403 pub agent_id: identity::AgentId,
405 pub machine_id: identity::MachineId,
407 pub user_id: Option<identity::UserId>,
409 pub addresses: Vec<std::net::SocketAddr>,
411 pub announced_at: u64,
413 pub last_seen: u64,
415 #[doc(hidden)]
420 pub machine_public_key: Vec<u8>,
421 pub nat_type: Option<String>,
424 pub can_receive_direct: Option<bool>,
427 pub is_relay: Option<bool>,
430 pub is_coordinator: Option<bool>,
433}
434
435#[derive(Debug)]
472pub struct AgentBuilder {
473 machine_key_path: Option<std::path::PathBuf>,
474 agent_keypair: Option<identity::AgentKeypair>,
475 agent_key_path: Option<std::path::PathBuf>,
476 user_keypair: Option<identity::UserKeypair>,
477 user_key_path: Option<std::path::PathBuf>,
478 #[allow(dead_code)]
479 network_config: Option<network::NetworkConfig>,
480 peer_cache_dir: Option<std::path::PathBuf>,
481 heartbeat_interval_secs: Option<u64>,
482 identity_ttl_secs: Option<u64>,
483 contact_store_path: Option<std::path::PathBuf>,
485}
486
487struct HeartbeatContext {
489 identity: std::sync::Arc<identity::Identity>,
490 runtime: std::sync::Arc<gossip::GossipRuntime>,
491 network: std::sync::Arc<network::NetworkNode>,
492 interval_secs: u64,
493 cache: std::sync::Arc<
494 tokio::sync::RwLock<std::collections::HashMap<identity::AgentId, DiscoveredAgent>>,
495 >,
496}
497
498impl HeartbeatContext {
499 async fn announce(&self) -> error::Result<()> {
500 let machine_public_key = self
501 .identity
502 .machine_keypair()
503 .public_key()
504 .as_bytes()
505 .to_vec();
506 let announced_at = Agent::unix_timestamp_secs();
507
508 let mut addresses = match self.network.node_status().await {
511 Some(status) if !status.external_addrs.is_empty() => status.external_addrs,
512 _ => match self.network.routable_addr().await {
513 Some(addr) => vec![addr],
514 None => Vec::new(),
515 },
516 };
517
518 let port = addresses.first().map(|a| a.port()).unwrap_or(5483);
522 if let Ok(sock) = std::net::UdpSocket::bind("[::]:0") {
523 if sock.connect("[2001:4860:4860::8888]:80").is_ok() {
524 if let Ok(local) = sock.local_addr() {
525 if let std::net::IpAddr::V6(v6) = local.ip() {
526 let segs = v6.segments();
527 let is_global = (segs[0] & 0xffc0) != 0xfe80
528 && (segs[0] & 0xff00) != 0xfd00
529 && !v6.is_loopback();
530 if is_global {
531 let v6_addr = std::net::SocketAddr::new(std::net::IpAddr::V6(v6), port);
532 if !addresses.contains(&v6_addr) {
533 addresses.push(v6_addr);
534 }
535 }
536 }
537 }
538 }
539 }
540
541 let (nat_type, can_receive_direct, is_relay, is_coordinator) =
543 match self.network.node_status().await {
544 Some(status) => (
545 Some(status.nat_type.to_string()),
546 Some(status.can_receive_direct),
547 Some(status.is_relaying),
548 Some(status.is_coordinating),
549 ),
550 None => (None, None, None, None),
551 };
552
553 let unsigned = IdentityAnnouncementUnsigned {
554 agent_id: self.identity.agent_id(),
555 machine_id: self.identity.machine_id(),
556 user_id: self
557 .identity
558 .user_keypair()
559 .map(identity::UserKeypair::user_id),
560 agent_certificate: self.identity.agent_certificate().cloned(),
561 machine_public_key: machine_public_key.clone(),
562 addresses,
563 announced_at,
564 nat_type: nat_type.clone(),
565 can_receive_direct,
566 is_relay,
567 is_coordinator,
568 };
569 let unsigned_bytes = bincode::serialize(&unsigned).map_err(|e| {
570 error::IdentityError::Serialization(format!(
571 "heartbeat: failed to serialize announcement: {e}"
572 ))
573 })?;
574 let machine_signature = ant_quic::crypto::raw_public_keys::pqc::sign_with_ml_dsa(
575 self.identity.machine_keypair().secret_key(),
576 &unsigned_bytes,
577 )
578 .map_err(|e| {
579 error::IdentityError::Storage(std::io::Error::other(format!(
580 "heartbeat: failed to sign announcement: {:?}",
581 e
582 )))
583 })?
584 .as_bytes()
585 .to_vec();
586
587 let announcement = IdentityAnnouncement {
588 agent_id: unsigned.agent_id,
589 machine_id: unsigned.machine_id,
590 user_id: unsigned.user_id,
591 agent_certificate: unsigned.agent_certificate,
592 machine_public_key: machine_public_key.clone(),
593 machine_signature,
594 addresses: unsigned.addresses,
595 announced_at,
596 nat_type,
597 can_receive_direct,
598 is_relay,
599 is_coordinator,
600 };
601 let encoded = bincode::serialize(&announcement).map_err(|e| {
602 error::IdentityError::Serialization(format!(
603 "heartbeat: failed to serialize announcement: {e}"
604 ))
605 })?;
606 self.runtime
607 .pubsub()
608 .publish(
609 IDENTITY_ANNOUNCE_TOPIC.to_string(),
610 bytes::Bytes::from(encoded),
611 )
612 .await
613 .map_err(|e| {
614 error::IdentityError::Storage(std::io::Error::other(format!(
615 "heartbeat: publish failed: {e}"
616 )))
617 })?;
618 let now = Agent::unix_timestamp_secs();
619 self.cache.write().await.insert(
620 announcement.agent_id,
621 DiscoveredAgent {
622 agent_id: announcement.agent_id,
623 machine_id: announcement.machine_id,
624 user_id: announcement.user_id,
625 addresses: announcement.addresses,
626 announced_at: announcement.announced_at,
627 last_seen: now,
628 machine_public_key: machine_public_key.clone(),
629 nat_type: announcement.nat_type.clone(),
630 can_receive_direct: announcement.can_receive_direct,
631 is_relay: announcement.is_relay,
632 is_coordinator: announcement.is_coordinator,
633 },
634 );
635 Ok(())
636 }
637}
638
639impl Agent {
640 pub async fn new() -> error::Result<Self> {
647 Agent::builder().build().await
648 }
649
650 pub fn builder() -> AgentBuilder {
657 AgentBuilder {
658 machine_key_path: None,
659 agent_keypair: None,
660 agent_key_path: None,
661 user_keypair: None,
662 user_key_path: None,
663 network_config: None,
664 peer_cache_dir: None,
665 heartbeat_interval_secs: None,
666 identity_ttl_secs: None,
667 contact_store_path: None,
668 }
669 }
670
671 #[inline]
677 #[must_use]
678 pub fn identity(&self) -> &identity::Identity {
679 &self.identity
680 }
681
682 #[inline]
691 #[must_use]
692 pub fn machine_id(&self) -> identity::MachineId {
693 self.identity.machine_id()
694 }
695
696 #[inline]
706 #[must_use]
707 pub fn agent_id(&self) -> identity::AgentId {
708 self.identity.agent_id()
709 }
710
711 #[inline]
716 #[must_use]
717 pub fn user_id(&self) -> Option<identity::UserId> {
718 self.identity.user_id()
719 }
720
721 #[inline]
725 #[must_use]
726 pub fn agent_certificate(&self) -> Option<&identity::AgentCertificate> {
727 self.identity.agent_certificate()
728 }
729
730 #[must_use]
732 pub fn network(&self) -> Option<&std::sync::Arc<network::NetworkNode>> {
733 self.network.as_ref()
734 }
735
736 pub fn gossip_cache_adapter(&self) -> Option<&saorsa_gossip_coordinator::GossipCacheAdapter> {
741 self.gossip_cache_adapter.as_ref()
742 }
743
744 #[must_use]
750 pub fn presence_system(&self) -> Option<&std::sync::Arc<presence::PresenceWrapper>> {
751 self.presence.as_ref()
752 }
753
754 #[must_use]
762 pub fn contacts(&self) -> &std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>> {
763 &self.contact_store
764 }
765
766 pub async fn reachability(
772 &self,
773 agent_id: &identity::AgentId,
774 ) -> Option<connectivity::ReachabilityInfo> {
775 let cache = self.identity_discovery_cache.read().await;
776 cache
777 .get(agent_id)
778 .map(connectivity::ReachabilityInfo::from_discovered)
779 }
780
781 pub async fn connect_to_agent(
799 &self,
800 agent_id: &identity::AgentId,
801 ) -> error::Result<connectivity::ConnectOutcome> {
802 let discovered = {
804 let cache = self.identity_discovery_cache.read().await;
805 cache.get(agent_id).cloned()
806 };
807
808 let agent = match discovered {
809 Some(a) => a,
810 None => return Ok(connectivity::ConnectOutcome::NotFound),
811 };
812
813 let info = connectivity::ReachabilityInfo::from_discovered(&agent);
814
815 if info.addresses.is_empty() {
816 return Ok(connectivity::ConnectOutcome::Unreachable);
817 }
818
819 let Some(ref network) = self.network else {
820 return Ok(connectivity::ConnectOutcome::Unreachable);
821 };
822
823 let machine_peer_id = ant_quic::PeerId(agent.machine_id.0);
825 if network.is_connected(&machine_peer_id).await {
826 self.direct_messaging
827 .mark_connected(agent.agent_id, agent.machine_id)
828 .await;
829 if let Some(addr) = info.addresses.first() {
831 return Ok(connectivity::ConnectOutcome::Direct(*addr));
832 }
833 }
834
835 if info.likely_direct() {
837 for addr in &info.addresses {
838 match network.connect_addr(*addr).await {
839 Ok(connected_peer_id) => {
840 let real_machine_id = identity::MachineId(connected_peer_id.0);
843 if let Some(ref bc) = self.bootstrap_cache {
845 bc.add_from_connection(connected_peer_id, vec![*addr], None)
846 .await;
847 }
848 {
850 let mut cache = self.identity_discovery_cache.write().await;
851 if let Some(entry) = cache.get_mut(agent_id) {
852 entry.machine_id = real_machine_id;
853 }
854 }
855 self.direct_messaging
857 .mark_connected(agent.agent_id, real_machine_id)
858 .await;
859 return Ok(connectivity::ConnectOutcome::Direct(*addr));
860 }
861 Err(e) => {
862 tracing::debug!("Direct connect to {} failed: {}", addr, e);
863 }
864 }
865 }
866 }
867
868 if info.needs_coordination() || !info.likely_direct() {
872 {
877 let cache = self.identity_discovery_cache.read().await;
878 let reachable: Vec<std::net::SocketAddr> = cache
879 .values()
880 .filter(|a| a.can_receive_direct == Some(true))
881 .flat_map(|a| a.addresses.clone())
882 .take(6)
883 .collect();
884 drop(cache);
885 for addr in &reachable {
886 if network.connect_addr(*addr).await.is_ok() {
887 tracing::debug!(
888 addr = %addr,
889 "Connected to reachable peer for NAT coordination"
890 );
891 break;
892 }
893 }
894 }
895
896 for addr in &info.addresses {
897 match network.connect_addr(*addr).await {
898 Ok(connected_peer_id) => {
899 let real_machine_id = identity::MachineId(connected_peer_id.0);
900 if let Some(ref bc) = self.bootstrap_cache {
901 bc.add_from_connection(connected_peer_id, vec![*addr], None)
902 .await;
903 }
904 {
906 let mut cache = self.identity_discovery_cache.write().await;
907 if let Some(entry) = cache.get_mut(agent_id) {
908 entry.machine_id = real_machine_id;
909 }
910 }
911 self.direct_messaging
913 .mark_connected(agent.agent_id, real_machine_id)
914 .await;
915 return Ok(connectivity::ConnectOutcome::Coordinated(*addr));
916 }
917 Err(e) => {
918 tracing::debug!("Coordinated connect to {} failed: {}", addr, e);
919 }
920 }
921 }
922 }
923
924 Ok(connectivity::ConnectOutcome::Unreachable)
925 }
926
927 pub async fn shutdown(&self) {
933 if let Some(ref pw) = self.presence {
935 pw.shutdown().await;
936 tracing::info!("Presence system shut down");
937 }
938
939 if let Some(ref cache) = self.bootstrap_cache {
940 if let Err(e) = cache.save().await {
941 tracing::warn!("Failed to save bootstrap cache on shutdown: {e}");
942 } else {
943 tracing::info!("Bootstrap cache saved on shutdown");
944 }
945 }
946 }
947
948 pub async fn send_direct(
978 &self,
979 agent_id: &identity::AgentId,
980 payload: Vec<u8>,
981 ) -> error::NetworkResult<()> {
982 let network = self.network.as_ref().ok_or_else(|| {
983 error::NetworkError::NodeCreation("network not initialized".to_string())
984 })?;
985
986 let cached_machine_id = {
988 let cache = self.identity_discovery_cache.read().await;
989 cache
990 .get(agent_id)
991 .map(|d| d.machine_id)
992 .filter(|m| m.0 != [0u8; 32]) };
994 let machine_id = match cached_machine_id {
995 Some(id) => id,
996 None => {
997 match self.direct_messaging.get_machine_id(agent_id).await {
1000 Some(id) => id,
1001 None => {
1002 let _ = self.connect_to_agent(agent_id).await;
1005 self.direct_messaging
1006 .get_machine_id(agent_id)
1007 .await
1008 .ok_or(error::NetworkError::AgentNotFound(agent_id.0))?
1009 }
1010 }
1011 }
1012 };
1013
1014 let ant_peer_id = ant_quic::PeerId(machine_id.0);
1016 if !network.is_connected(&ant_peer_id).await {
1017 return Err(error::NetworkError::AgentNotConnected(agent_id.0));
1018 }
1019
1020 network
1022 .send_direct(&ant_peer_id, &self.identity.agent_id().0, &payload)
1023 .await?;
1024
1025 tracing::info!(
1026 "Sent {} bytes directly to agent {:?}",
1027 payload.len(),
1028 agent_id
1029 );
1030
1031 Ok(())
1032 }
1033
1034 pub async fn recv_direct(&self) -> Option<direct::DirectMessage> {
1059 self.recv_direct_inner().await
1060 }
1061
1062 pub async fn recv_direct_filtered(&self) -> Option<direct::DirectMessage> {
1091 loop {
1092 let msg = self.recv_direct_inner().await?;
1093
1094 let contacts = self.contact_store.read().await;
1096 if let Some(contact) = contacts.get(&msg.sender) {
1097 if contact.trust_level == contacts::TrustLevel::Blocked {
1098 tracing::debug!(
1099 "Dropping direct message from blocked agent {:?}",
1100 msg.sender
1101 );
1102 continue;
1103 }
1104 }
1105
1106 return Some(msg);
1107 }
1108 }
1109
1110 async fn recv_direct_inner(&self) -> Option<direct::DirectMessage> {
1117 self.direct_messaging.recv().await
1118 }
1119
1120 pub fn subscribe_direct(&self) -> direct::DirectMessageReceiver {
1136 self.direct_messaging.subscribe()
1137 }
1138
1139 pub fn direct_messaging(&self) -> &std::sync::Arc<direct::DirectMessaging> {
1143 &self.direct_messaging
1144 }
1145
1146 pub async fn is_agent_connected(&self, agent_id: &identity::AgentId) -> bool {
1156 let Some(network) = &self.network else {
1157 return false;
1158 };
1159
1160 let machine_id = {
1162 let cache = self.identity_discovery_cache.read().await;
1163 cache.get(agent_id).map(|d| d.machine_id)
1164 };
1165
1166 match machine_id {
1167 Some(mid) => {
1168 let ant_peer_id = ant_quic::PeerId(mid.0);
1169 network.is_connected(&ant_peer_id).await
1170 }
1171 None => false,
1172 }
1173 }
1174
1175 pub async fn connected_agents(&self) -> Vec<identity::AgentId> {
1180 let Some(network) = &self.network else {
1181 return Vec::new();
1182 };
1183
1184 let connected_peers = network.connected_peers().await;
1185 let cache = self.identity_discovery_cache.read().await;
1186
1187 cache
1189 .values()
1190 .filter(|agent| {
1191 let ant_peer_id = ant_quic::PeerId(agent.machine_id.0);
1192 connected_peers.contains(&ant_peer_id)
1193 })
1194 .map(|agent| agent.agent_id)
1195 .collect()
1196 }
1197
1198 pub fn set_contacts(&self, store: std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>>) {
1206 if let Some(runtime) = &self.gossip_runtime {
1207 runtime.pubsub().set_contacts(store);
1208 }
1209 }
1210
1211 pub async fn announce_identity(
1229 &self,
1230 include_user_identity: bool,
1231 human_consent: bool,
1232 ) -> error::Result<()> {
1233 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1234 error::IdentityError::Storage(std::io::Error::other(
1235 "gossip runtime not initialized - configure agent with network first",
1236 ))
1237 })?;
1238
1239 self.start_identity_listener().await?;
1240
1241 let mut addresses = if let Some(network) = self.network.as_ref() {
1243 match network.node_status().await {
1244 Some(status) if !status.external_addrs.is_empty() => status.external_addrs,
1245 _ => match network.routable_addr().await {
1246 Some(addr) => vec![addr],
1247 None => self.announcement_addresses(),
1248 },
1249 }
1250 } else {
1251 self.announcement_addresses()
1252 };
1253 let port = addresses.first().map(|a| a.port()).unwrap_or(5483);
1257
1258 if let Ok(sock) = std::net::UdpSocket::bind("[::]:0") {
1260 if sock.connect("[2001:4860:4860::8888]:80").is_ok() {
1261 if let Ok(local) = sock.local_addr() {
1262 if let std::net::IpAddr::V6(v6) = local.ip() {
1263 let segs = v6.segments();
1264 let is_global = (segs[0] & 0xffc0) != 0xfe80
1265 && (segs[0] & 0xff00) != 0xfd00
1266 && !v6.is_loopback();
1267 if is_global {
1268 let v6_addr = std::net::SocketAddr::new(std::net::IpAddr::V6(v6), port);
1269 if !addresses.contains(&v6_addr) {
1270 addresses.push(v6_addr);
1271 }
1272 }
1273 }
1274 }
1275 }
1276 }
1277 let announcement = self.build_identity_announcement_with_addrs(
1278 include_user_identity,
1279 human_consent,
1280 addresses,
1281 )?;
1282
1283 let encoded = bincode::serialize(&announcement).map_err(|e| {
1284 error::IdentityError::Serialization(format!(
1285 "failed to serialize identity announcement: {e}"
1286 ))
1287 })?;
1288
1289 let payload = bytes::Bytes::from(encoded);
1290
1291 let shard_topic = shard_topic_for_agent(&announcement.agent_id);
1293 runtime
1294 .pubsub()
1295 .publish(shard_topic, payload.clone())
1296 .await
1297 .map_err(|e| {
1298 error::IdentityError::Storage(std::io::Error::other(format!(
1299 "failed to publish identity announcement to shard topic: {e}"
1300 )))
1301 })?;
1302
1303 runtime
1305 .pubsub()
1306 .publish(IDENTITY_ANNOUNCE_TOPIC.to_string(), payload)
1307 .await
1308 .map_err(|e| {
1309 error::IdentityError::Storage(std::io::Error::other(format!(
1310 "failed to publish identity announcement: {e}"
1311 )))
1312 })?;
1313
1314 let now = Self::unix_timestamp_secs();
1315 self.identity_discovery_cache.write().await.insert(
1316 announcement.agent_id,
1317 DiscoveredAgent {
1318 agent_id: announcement.agent_id,
1319 machine_id: announcement.machine_id,
1320 user_id: announcement.user_id,
1321 addresses: announcement.addresses.clone(),
1322 announced_at: announcement.announced_at,
1323 last_seen: now,
1324 machine_public_key: announcement.machine_public_key.clone(),
1325 nat_type: announcement.nat_type.clone(),
1326 can_receive_direct: announcement.can_receive_direct,
1327 is_relay: announcement.is_relay,
1328 is_coordinator: announcement.is_coordinator,
1329 },
1330 );
1331
1332 Ok(())
1333 }
1334
1335 pub async fn discovered_agents(&self) -> error::Result<Vec<DiscoveredAgent>> {
1341 self.start_identity_listener().await?;
1342 let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
1343 let mut agents: Vec<_> = self
1344 .identity_discovery_cache
1345 .read()
1346 .await
1347 .values()
1348 .filter(|a| a.announced_at >= cutoff)
1349 .cloned()
1350 .collect();
1351 agents.sort_by(|a, b| a.agent_id.0.cmp(&b.agent_id.0));
1352 Ok(agents)
1353 }
1354
1355 pub async fn discovered_agents_unfiltered(&self) -> error::Result<Vec<DiscoveredAgent>> {
1364 self.start_identity_listener().await?;
1365 let mut agents: Vec<_> = self
1366 .identity_discovery_cache
1367 .read()
1368 .await
1369 .values()
1370 .cloned()
1371 .collect();
1372 agents.sort_by(|a, b| a.agent_id.0.cmp(&b.agent_id.0));
1373 Ok(agents)
1374 }
1375
1376 pub async fn discovered_agent(
1382 &self,
1383 agent_id: identity::AgentId,
1384 ) -> error::Result<Option<DiscoveredAgent>> {
1385 self.start_identity_listener().await?;
1386 Ok(self
1387 .identity_discovery_cache
1388 .read()
1389 .await
1390 .get(&agent_id)
1391 .cloned())
1392 }
1393
1394 async fn start_identity_listener(&self) -> error::Result<()> {
1395 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1396 error::IdentityError::Storage(std::io::Error::other(
1397 "gossip runtime not initialized - configure agent with network first",
1398 ))
1399 })?;
1400
1401 if self
1402 .identity_listener_started
1403 .swap(true, std::sync::atomic::Ordering::AcqRel)
1404 {
1405 return Ok(());
1406 }
1407
1408 let mut sub_legacy = runtime
1409 .pubsub()
1410 .subscribe(IDENTITY_ANNOUNCE_TOPIC.to_string())
1411 .await;
1412 let own_shard_topic = shard_topic_for_agent(&self.agent_id());
1413 let mut sub_shard = runtime.pubsub().subscribe(own_shard_topic).await;
1414 let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
1415 let bootstrap_cache = self.bootstrap_cache.clone();
1416 let contact_store = std::sync::Arc::clone(&self.contact_store);
1417 let network = self.network.as_ref().map(std::sync::Arc::clone);
1418 let own_agent_id = self.agent_id();
1419
1420 tokio::spawn(async move {
1421 let mut auto_connect_attempted = std::collections::HashSet::<identity::AgentId>::new();
1424
1425 loop {
1426 let msg = tokio::select! {
1428 Some(m) = sub_legacy.recv() => m,
1429 Some(m) = sub_shard.recv() => m,
1430 else => break,
1431 };
1432 let decoded = {
1433 use bincode::Options;
1434 bincode::options()
1435 .with_fixint_encoding()
1436 .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
1437 .allow_trailing_bytes()
1438 .deserialize::<IdentityAnnouncement>(&msg.payload)
1439 };
1440 let announcement = match decoded {
1441 Ok(a) => a,
1442 Err(e) => {
1443 tracing::debug!("Ignoring invalid identity announcement payload: {}", e);
1444 continue;
1445 }
1446 };
1447
1448 if let Err(e) = announcement.verify() {
1449 tracing::warn!("Ignoring unverifiable identity announcement: {}", e);
1450 continue;
1451 }
1452
1453 {
1456 let store = contact_store.read().await;
1457 let evaluator = trust::TrustEvaluator::new(&store);
1458 let decision = evaluator.evaluate(&trust::TrustContext {
1459 agent_id: &announcement.agent_id,
1460 machine_id: &announcement.machine_id,
1461 });
1462 match decision {
1463 trust::TrustDecision::RejectBlocked => {
1464 tracing::debug!(
1465 "Dropping identity announcement from blocked agent {:?}",
1466 hex::encode(&announcement.agent_id.0[..8]),
1467 );
1468 continue;
1469 }
1470 trust::TrustDecision::RejectMachineMismatch => {
1471 tracing::warn!(
1472 "Dropping identity announcement from agent {:?}: machine {:?} not in pinned list",
1473 hex::encode(&announcement.agent_id.0[..8]),
1474 hex::encode(&announcement.machine_id.0[..8]),
1475 );
1476 continue;
1477 }
1478 _ => {}
1479 }
1480 }
1481
1482 {
1484 let mut store = contact_store.write().await;
1485 let record = contacts::MachineRecord::new(announcement.machine_id, None);
1486 store.add_machine(&announcement.agent_id, record);
1487 }
1488
1489 let now = std::time::SystemTime::now()
1490 .duration_since(std::time::UNIX_EPOCH)
1491 .map_or(0, |d| d.as_secs());
1492
1493 if !announcement.addresses.is_empty() {
1497 if let Some(ref bc) = &bootstrap_cache {
1498 let peer_id = ant_quic::PeerId(announcement.machine_id.0);
1499 bc.add_from_connection(peer_id, announcement.addresses.clone(), None)
1500 .await;
1501 tracing::debug!(
1502 "Added {} addresses from identity announcement to bootstrap cache for agent {:?} (machine {:?})",
1503 announcement.addresses.len(),
1504 announcement.agent_id,
1505 hex::encode(&announcement.machine_id.0[..8]),
1506 );
1507 }
1508 }
1509
1510 cache.write().await.insert(
1511 announcement.agent_id,
1512 DiscoveredAgent {
1513 agent_id: announcement.agent_id,
1514 machine_id: announcement.machine_id,
1515 user_id: announcement.user_id,
1516 addresses: announcement.addresses.clone(),
1517 announced_at: announcement.announced_at,
1518 last_seen: now,
1519 machine_public_key: announcement.machine_public_key.clone(),
1520 nat_type: announcement.nat_type.clone(),
1521 can_receive_direct: announcement.can_receive_direct,
1522 is_relay: announcement.is_relay,
1523 is_coordinator: announcement.is_coordinator,
1524 },
1525 );
1526
1527 if announcement.agent_id != own_agent_id
1532 && !announcement.addresses.is_empty()
1533 && !auto_connect_attempted.contains(&announcement.agent_id)
1534 {
1535 if let Some(ref net) = &network {
1536 let ant_peer = ant_quic::PeerId(announcement.machine_id.0);
1537 if !net.is_connected(&ant_peer).await {
1538 auto_connect_attempted.insert(announcement.agent_id);
1539 let net = std::sync::Arc::clone(net);
1540 let addresses = announcement.addresses.clone();
1541 tokio::spawn(async move {
1542 for addr in &addresses {
1543 match net.connect_addr(*addr).await {
1544 Ok(_) => {
1545 tracing::info!(
1546 "Auto-connected to discovered agent at {addr}",
1547 );
1548 return;
1549 }
1550 Err(e) => {
1551 tracing::debug!("Auto-connect to {addr} failed: {e}",);
1552 }
1553 }
1554 }
1555 tracing::debug!(
1556 "Auto-connect exhausted all {} addresses for discovered agent",
1557 addresses.len(),
1558 );
1559 });
1560 }
1561 }
1562 }
1563 }
1564 });
1565
1566 Ok(())
1567 }
1568
1569 fn unix_timestamp_secs() -> u64 {
1570 std::time::SystemTime::now()
1571 .duration_since(std::time::UNIX_EPOCH)
1572 .map_or(0, |d| d.as_secs())
1573 }
1574
1575 fn announcement_addresses(&self) -> Vec<std::net::SocketAddr> {
1576 match self.network.as_ref().and_then(|n| n.local_addr()) {
1579 Some(addr) if addr.port() > 0 && !addr.ip().is_unspecified() => vec![addr],
1580 _ => Vec::new(),
1581 }
1582 }
1583
1584 fn build_identity_announcement(
1585 &self,
1586 include_user_identity: bool,
1587 human_consent: bool,
1588 ) -> error::Result<IdentityAnnouncement> {
1589 self.build_identity_announcement_with_addrs(
1590 include_user_identity,
1591 human_consent,
1592 self.announcement_addresses(),
1593 )
1594 }
1595
1596 fn build_identity_announcement_with_addrs(
1597 &self,
1598 include_user_identity: bool,
1599 human_consent: bool,
1600 addresses: Vec<std::net::SocketAddr>,
1601 ) -> error::Result<IdentityAnnouncement> {
1602 if include_user_identity && !human_consent {
1603 return Err(error::IdentityError::Storage(std::io::Error::other(
1604 "human identity disclosure requires explicit human consent — set human_consent: true in the request body",
1605 )));
1606 }
1607
1608 let (user_id, agent_certificate) = if include_user_identity {
1609 let user_id = self.user_id().ok_or_else(|| {
1610 error::IdentityError::Storage(std::io::Error::other(
1611 "human identity disclosure requested but no user identity is configured — set user_key_path in your config.toml to point at your user keypair file",
1612 ))
1613 })?;
1614 let cert = self.agent_certificate().cloned().ok_or_else(|| {
1615 error::IdentityError::Storage(std::io::Error::other(
1616 "human identity disclosure requested but agent certificate is missing",
1617 ))
1618 })?;
1619 (Some(user_id), Some(cert))
1620 } else {
1621 (None, None)
1622 };
1623
1624 let machine_public_key = self
1625 .identity
1626 .machine_keypair()
1627 .public_key()
1628 .as_bytes()
1629 .to_vec();
1630
1631 let unsigned = IdentityAnnouncementUnsigned {
1635 agent_id: self.agent_id(),
1636 machine_id: self.machine_id(),
1637 user_id,
1638 agent_certificate: agent_certificate.clone(),
1639 machine_public_key: machine_public_key.clone(),
1640 addresses,
1641 announced_at: Self::unix_timestamp_secs(),
1642 nat_type: None,
1643 can_receive_direct: None,
1644 is_relay: None,
1645 is_coordinator: None,
1646 };
1647 let unsigned_bytes = bincode::serialize(&unsigned).map_err(|e| {
1648 error::IdentityError::Serialization(format!(
1649 "failed to serialize unsigned identity announcement: {e}"
1650 ))
1651 })?;
1652 let machine_signature = ant_quic::crypto::raw_public_keys::pqc::sign_with_ml_dsa(
1653 self.identity.machine_keypair().secret_key(),
1654 &unsigned_bytes,
1655 )
1656 .map_err(|e| {
1657 error::IdentityError::Storage(std::io::Error::other(format!(
1658 "failed to sign identity announcement with machine key: {:?}",
1659 e
1660 )))
1661 })?
1662 .as_bytes()
1663 .to_vec();
1664
1665 Ok(IdentityAnnouncement {
1666 agent_id: unsigned.agent_id,
1667 machine_id: unsigned.machine_id,
1668 user_id: unsigned.user_id,
1669 agent_certificate: unsigned.agent_certificate,
1670 machine_public_key,
1671 machine_signature,
1672 addresses: unsigned.addresses,
1673 announced_at: unsigned.announced_at,
1674 nat_type: unsigned.nat_type,
1675 can_receive_direct: unsigned.can_receive_direct,
1676 is_relay: unsigned.is_relay,
1677 is_coordinator: unsigned.is_coordinator,
1678 })
1679 }
1680
1681 pub async fn join_network(&self) -> error::Result<()> {
1690 let Some(network) = self.network.as_ref() else {
1691 tracing::debug!("join_network called but no network configured");
1692 return Ok(());
1693 };
1694
1695 if let Some(ref runtime) = self.gossip_runtime {
1696 runtime.start().await.map_err(|e| {
1697 error::IdentityError::Storage(std::io::Error::other(format!(
1698 "failed to start gossip runtime: {e}"
1699 )))
1700 })?;
1701 tracing::info!("Gossip runtime started");
1702 }
1703 self.start_identity_listener().await?;
1704 self.start_direct_listener();
1705
1706 let bootstrap_nodes = network.config().bootstrap_nodes.clone();
1707
1708 let min_connected = 3;
1709 let mut all_connected: Vec<std::net::SocketAddr> = Vec::new();
1710
1711 if let Some(ref cache) = self.bootstrap_cache {
1715 let coordinators = cache.select_coordinators(6).await;
1716 let coordinator_addrs: Vec<std::net::SocketAddr> = coordinators
1717 .iter()
1718 .flat_map(|peer| peer.addresses.clone())
1719 .collect();
1720
1721 if !coordinator_addrs.is_empty() {
1722 tracing::info!(
1723 "Phase 0: Trying {} addresses from {} cached coordinators",
1724 coordinator_addrs.len(),
1725 coordinators.len()
1726 );
1727 let (succeeded, _failed) = self
1728 .connect_peers_parallel_tracked(network, &coordinator_addrs)
1729 .await;
1730 all_connected.extend(&succeeded);
1731 tracing::info!(
1732 "Phase 0: {}/{} coordinator addresses connected",
1733 succeeded.len(),
1734 coordinator_addrs.len()
1735 );
1736 }
1737 }
1738
1739 if all_connected.len() < min_connected {
1741 if let Some(ref cache) = self.bootstrap_cache {
1742 const PHASE1_PEER_CANDIDATES: usize = 12;
1743 let cached_peers = cache.select_peers(PHASE1_PEER_CANDIDATES).await;
1744 if !cached_peers.is_empty() {
1745 tracing::info!("Phase 1: Trying {} cached peers", cached_peers.len());
1746 let (succeeded, _failed) = self
1747 .connect_cached_peers_parallel_tracked(network, &cached_peers)
1748 .await;
1749 all_connected.extend(&succeeded);
1750 tracing::info!(
1751 "Phase 1: {}/{} cached peers connected",
1752 succeeded.len(),
1753 cached_peers.len()
1754 );
1755 }
1756 }
1757 } if all_connected.len() < min_connected && !bootstrap_nodes.is_empty() {
1762 let remaining: Vec<std::net::SocketAddr> = bootstrap_nodes
1763 .iter()
1764 .filter(|addr| !all_connected.contains(addr))
1765 .copied()
1766 .collect();
1767
1768 let (succeeded, mut failed) = self
1770 .connect_peers_parallel_tracked(network, &remaining)
1771 .await;
1772 all_connected.extend(&succeeded);
1773 tracing::info!(
1774 "Phase 2 round 1: {}/{} bootstrap peers connected",
1775 succeeded.len(),
1776 remaining.len()
1777 );
1778
1779 for round in 2..=3 {
1781 if failed.is_empty() {
1782 break;
1783 }
1784 let delay = std::time::Duration::from_secs(if round == 2 { 10 } else { 15 });
1785 tracing::info!(
1786 "Retrying {} failed peers in {}s (round {})",
1787 failed.len(),
1788 delay.as_secs(),
1789 round
1790 );
1791 tokio::time::sleep(delay).await;
1792
1793 let (succeeded, still_failed) =
1794 self.connect_peers_parallel_tracked(network, &failed).await;
1795 all_connected.extend(&succeeded);
1796 failed = still_failed;
1797 tracing::info!(
1798 "Phase 2 round {}: {} total peers connected",
1799 round,
1800 all_connected.len()
1801 );
1802 }
1803
1804 if !failed.is_empty() {
1805 tracing::warn!(
1806 "Could not connect to {} bootstrap peers: {:?}",
1807 failed.len(),
1808 failed
1809 );
1810 }
1811 }
1812
1813 tracing::info!(
1814 "Network join complete. Connected to {} peers.",
1815 all_connected.len()
1816 );
1817
1818 if let Some(ref runtime) = self.gossip_runtime {
1820 let seeds: Vec<String> = all_connected.iter().map(|addr| addr.to_string()).collect();
1821 if !seeds.is_empty() {
1822 if let Err(e) = runtime.membership().join(seeds).await {
1823 tracing::warn!("HyParView membership join failed: {e}");
1824 }
1825 }
1826 }
1827
1828 if let Some(ref pw) = self.presence {
1830 if let Some(ref runtime) = self.gossip_runtime {
1832 let active = runtime.membership().active_view();
1833 for peer in active {
1834 pw.manager().add_broadcast_peer(peer).await;
1835 }
1836 tracing::info!(
1837 "Presence seeded with {} broadcast peers",
1838 pw.manager().broadcast_peer_count().await
1839 );
1840 }
1841
1842 if let Some(ref net) = self.network {
1844 if let Some(status) = net.node_status().await {
1845 let mut hints: Vec<String> = status
1846 .external_addrs
1847 .iter()
1848 .map(|a| a.to_string())
1849 .collect();
1850 hints.push(status.local_addr.to_string());
1851 pw.manager().set_addr_hints(hints).await;
1852 }
1853 }
1854
1855 if pw.config().enable_beacons {
1856 if let Err(e) = pw
1857 .manager()
1858 .start_beacons(pw.config().beacon_interval_secs)
1859 .await
1860 {
1861 tracing::warn!("Failed to start presence beacons: {e}");
1862 } else {
1863 tracing::info!(
1864 "Presence beacons started (interval={}s)",
1865 pw.config().beacon_interval_secs
1866 );
1867 }
1868 }
1869
1870 pw.start_event_loop(std::sync::Arc::clone(&self.identity_discovery_cache))
1874 .await;
1875 tracing::debug!("Presence event loop started");
1876 }
1877
1878 if let Err(e) = self.announce_identity(false, false).await {
1879 tracing::warn!("Initial identity announcement failed: {}", e);
1880 }
1881 if let Err(e) = self.start_identity_heartbeat().await {
1882 tracing::warn!("Failed to start identity heartbeat: {e}");
1883 }
1884
1885 Ok(())
1886 }
1887
1888 async fn connect_cached_peers_parallel_tracked(
1890 &self,
1891 network: &std::sync::Arc<network::NetworkNode>,
1892 peers: &[ant_quic::CachedPeer],
1893 ) -> (Vec<std::net::SocketAddr>, Vec<ant_quic::PeerId>) {
1894 let handles: Vec<_> = peers
1895 .iter()
1896 .map(|peer| {
1897 let net = network.clone();
1898 let peer_id = peer.peer_id;
1899 tokio::spawn(async move {
1900 tracing::debug!("Connecting to cached peer: {:?}", peer_id);
1901 match net.connect_cached_peer(peer_id).await {
1902 Ok(addr) => {
1903 tracing::info!("Connected to cached peer {:?} at {}", peer_id, addr);
1904 Ok(addr)
1905 }
1906 Err(e) => {
1907 tracing::warn!("Failed to connect to cached peer {:?}: {}", peer_id, e);
1908 Err(peer_id)
1909 }
1910 }
1911 })
1912 })
1913 .collect();
1914
1915 let mut succeeded = Vec::new();
1916 let mut failed = Vec::new();
1917 for handle in handles {
1918 match handle.await {
1919 Ok(Ok(addr)) => succeeded.push(addr),
1920 Ok(Err(peer_id)) => failed.push(peer_id),
1921 Err(e) => tracing::error!("Connection task panicked: {}", e),
1922 }
1923 }
1924 (succeeded, failed)
1925 }
1926
1927 async fn connect_peers_parallel_tracked(
1929 &self,
1930 network: &std::sync::Arc<network::NetworkNode>,
1931 addrs: &[std::net::SocketAddr],
1932 ) -> (Vec<std::net::SocketAddr>, Vec<std::net::SocketAddr>) {
1933 let handles: Vec<_> = addrs
1934 .iter()
1935 .map(|addr| {
1936 let net = network.clone();
1937 let addr = *addr;
1938 tokio::spawn(async move {
1939 tracing::debug!("Connecting to peer: {}", addr);
1940 match net.connect_addr(addr).await {
1941 Ok(_) => {
1942 tracing::info!("Connected to peer: {}", addr);
1943 Ok(addr)
1944 }
1945 Err(e) => {
1946 tracing::warn!("Failed to connect to {}: {}", addr, e);
1947 Err(addr)
1948 }
1949 }
1950 })
1951 })
1952 .collect();
1953
1954 let mut succeeded = Vec::new();
1955 let mut failed = Vec::new();
1956 for handle in handles {
1957 match handle.await {
1958 Ok(Ok(addr)) => succeeded.push(addr),
1959 Ok(Err(addr)) => failed.push(addr),
1960 Err(e) => tracing::error!("Connection task panicked: {}", e),
1961 }
1962 }
1963 (succeeded, failed)
1964 }
1965
1966 pub async fn subscribe(&self, topic: &str) -> error::Result<Subscription> {
1976 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1977 error::IdentityError::Storage(std::io::Error::other(
1978 "gossip runtime not initialized - configure agent with network first",
1979 ))
1980 })?;
1981 Ok(runtime.pubsub().subscribe(topic.to_string()).await)
1982 }
1983
1984 pub async fn publish(&self, topic: &str, payload: Vec<u8>) -> error::Result<()> {
1996 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1997 error::IdentityError::Storage(std::io::Error::other(
1998 "gossip runtime not initialized - configure agent with network first",
1999 ))
2000 })?;
2001 runtime
2002 .pubsub()
2003 .publish(topic.to_string(), bytes::Bytes::from(payload))
2004 .await
2005 .map_err(|e| {
2006 error::IdentityError::Storage(std::io::Error::other(format!(
2007 "publish failed: {}",
2008 e
2009 )))
2010 })
2011 }
2012
2013 pub async fn peers(&self) -> error::Result<Vec<saorsa_gossip_types::PeerId>> {
2021 let network = self.network.as_ref().ok_or_else(|| {
2022 error::IdentityError::Storage(std::io::Error::other(
2023 "network not initialized - configure agent with network first",
2024 ))
2025 })?;
2026 let ant_peers = network.connected_peers().await;
2027 Ok(ant_peers
2028 .into_iter()
2029 .map(|p| saorsa_gossip_types::PeerId::new(p.0))
2030 .collect())
2031 }
2032
2033 pub async fn presence(&self) -> error::Result<Vec<identity::AgentId>> {
2041 self.start_identity_listener().await?;
2042 let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
2043 let mut agents: Vec<_> = self
2044 .identity_discovery_cache
2045 .read()
2046 .await
2047 .values()
2048 .filter(|a| a.announced_at >= cutoff)
2049 .map(|a| a.agent_id)
2050 .collect();
2051 agents.sort_by(|a, b| a.0.cmp(&b.0));
2052 Ok(agents)
2053 }
2054
2055 pub async fn subscribe_presence(
2069 &self,
2070 ) -> error::NetworkResult<tokio::sync::broadcast::Receiver<presence::PresenceEvent>> {
2071 let pw = self.presence.as_ref().ok_or_else(|| {
2072 error::NetworkError::NodeError("presence system not initialized".to_string())
2073 })?;
2074 pw.start_event_loop(std::sync::Arc::clone(&self.identity_discovery_cache))
2076 .await;
2077 Ok(pw.subscribe_events())
2078 }
2079
2080 pub async fn cached_agent(&self, id: &identity::AgentId) -> Option<DiscoveredAgent> {
2086 self.identity_discovery_cache.read().await.get(id).cloned()
2087 }
2088
2089 pub async fn discover_agents_foaf(
2108 &self,
2109 ttl: u8,
2110 timeout_ms: u64,
2111 ) -> error::NetworkResult<Vec<DiscoveredAgent>> {
2112 let pw = self.presence.as_ref().ok_or_else(|| {
2113 error::NetworkError::NodeError("presence system not initialized".to_string())
2114 })?;
2115
2116 let topic = presence::global_presence_topic();
2117 let raw_results: Vec<(
2118 saorsa_gossip_types::PeerId,
2119 saorsa_gossip_types::PresenceRecord,
2120 )> = pw
2121 .manager()
2122 .initiate_foaf_query(topic, ttl, timeout_ms)
2123 .await
2124 .map_err(|e| error::NetworkError::NodeError(e.to_string()))?;
2125
2126 let cache = self.identity_discovery_cache.read().await;
2127
2128 let mut seen: std::collections::HashSet<identity::AgentId> =
2130 std::collections::HashSet::new();
2131 let mut agents: Vec<DiscoveredAgent> = Vec::with_capacity(raw_results.len());
2132
2133 for (peer_id, record) in &raw_results {
2134 if let Some(agent) =
2135 presence::presence_record_to_discovered_agent(*peer_id, record, &cache)
2136 {
2137 if seen.insert(agent.agent_id) {
2138 agents.push(agent);
2139 }
2140 }
2141 }
2142
2143 Ok(agents)
2144 }
2145
2146 pub async fn discover_agent_by_id(
2160 &self,
2161 target_id: identity::AgentId,
2162 ttl: u8,
2163 timeout_ms: u64,
2164 ) -> error::NetworkResult<Option<DiscoveredAgent>> {
2165 {
2167 let cache = self.identity_discovery_cache.read().await;
2168 if let Some(agent) = cache.get(&target_id) {
2169 return Ok(Some(agent.clone()));
2170 }
2171 }
2172
2173 let agents = self.discover_agents_foaf(ttl, timeout_ms).await?;
2175 Ok(agents.into_iter().find(|a| a.agent_id == target_id))
2176 }
2177
2178 pub async fn find_agent(
2195 &self,
2196 agent_id: identity::AgentId,
2197 ) -> error::Result<Option<Vec<std::net::SocketAddr>>> {
2198 self.start_identity_listener().await?;
2199
2200 if let Some(addrs) = self
2202 .identity_discovery_cache
2203 .read()
2204 .await
2205 .get(&agent_id)
2206 .map(|e| e.addresses.clone())
2207 {
2208 return Ok(Some(addrs));
2209 }
2210
2211 let runtime = match self.gossip_runtime.as_ref() {
2213 Some(r) => r,
2214 None => return Ok(None),
2215 };
2216 let shard_topic = shard_topic_for_agent(&agent_id);
2217 let mut sub = runtime.pubsub().subscribe(shard_topic).await;
2218 let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
2219 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
2220
2221 loop {
2222 if tokio::time::Instant::now() >= deadline {
2223 break;
2224 }
2225 let timeout = tokio::time::sleep_until(deadline);
2226 tokio::select! {
2227 Some(msg) = sub.recv() => {
2228 if let Ok(ann) = {
2229 use bincode::Options;
2230 bincode::DefaultOptions::new()
2231 .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
2232 .deserialize::<IdentityAnnouncement>(&msg.payload)
2233 } {
2234 if ann.verify().is_ok() && ann.agent_id == agent_id {
2235 let now = std::time::SystemTime::now()
2236 .duration_since(std::time::UNIX_EPOCH)
2237 .map_or(0, |d| d.as_secs());
2238 let addrs = ann.addresses.clone();
2239 cache.write().await.insert(
2240 ann.agent_id,
2241 DiscoveredAgent {
2242 agent_id: ann.agent_id,
2243 machine_id: ann.machine_id,
2244 user_id: ann.user_id,
2245 addresses: ann.addresses,
2246 announced_at: ann.announced_at,
2247 last_seen: now,
2248 machine_public_key: ann.machine_public_key.clone(),
2249 nat_type: ann.nat_type.clone(),
2250 can_receive_direct: ann.can_receive_direct,
2251 is_relay: ann.is_relay,
2252 is_coordinator: ann.is_coordinator,
2253 },
2254 );
2255 return Ok(Some(addrs));
2256 }
2257 }
2258 }
2259 _ = timeout => break,
2260 }
2261 }
2262
2263 if let Some(addrs) = self.find_agent_rendezvous(agent_id, 5).await? {
2266 let now = std::time::SystemTime::now()
2267 .duration_since(std::time::UNIX_EPOCH)
2268 .map_or(0, |d| d.as_secs());
2269 cache
2270 .write()
2271 .await
2272 .entry(agent_id)
2273 .or_insert_with(|| DiscoveredAgent {
2274 agent_id,
2275 machine_id: identity::MachineId([0u8; 32]),
2276 user_id: None,
2277 addresses: addrs.clone(),
2278 announced_at: now,
2279 last_seen: now,
2280 machine_public_key: Vec::new(),
2281 nat_type: None,
2282 can_receive_direct: None,
2283 is_relay: None,
2284 is_coordinator: None,
2285 });
2286 return Ok(Some(addrs));
2287 }
2288
2289 Ok(None)
2290 }
2291
2292 pub async fn find_agents_by_user(
2305 &self,
2306 user_id: identity::UserId,
2307 ) -> error::Result<Vec<DiscoveredAgent>> {
2308 self.start_identity_listener().await?;
2309 let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
2310 Ok(self
2311 .identity_discovery_cache
2312 .read()
2313 .await
2314 .values()
2315 .filter(|a| a.announced_at >= cutoff && a.user_id == Some(user_id))
2316 .cloned()
2317 .collect())
2318 }
2319
2320 #[must_use]
2325 pub fn local_addr(&self) -> Option<std::net::SocketAddr> {
2326 self.network.as_ref().and_then(|n| n.local_addr())
2327 }
2328
2329 pub fn build_announcement(
2337 &self,
2338 include_user: bool,
2339 consent: bool,
2340 ) -> error::Result<IdentityAnnouncement> {
2341 self.build_identity_announcement(include_user, consent)
2342 }
2343
2344 fn start_direct_listener(&self) {
2357 if self
2358 .direct_listener_started
2359 .swap(true, std::sync::atomic::Ordering::AcqRel)
2360 {
2361 return;
2362 }
2363
2364 let Some(network) = self.network.as_ref().map(std::sync::Arc::clone) else {
2365 return;
2366 };
2367 let dm = std::sync::Arc::clone(&self.direct_messaging);
2368
2369 tokio::spawn(async move {
2370 tracing::info!("Direct message listener started");
2371 loop {
2372 let Some((ant_peer_id, payload)) = network.recv_direct().await else {
2373 tracing::debug!("Direct message channel closed");
2374 break;
2375 };
2376
2377 if payload.len() < 32 {
2379 tracing::warn!("Direct message too short ({} bytes)", payload.len());
2380 continue;
2381 }
2382
2383 let mut sender_bytes = [0u8; 32];
2384 sender_bytes.copy_from_slice(&payload[..32]);
2385 let sender = identity::AgentId(sender_bytes);
2386 let machine_id = identity::MachineId(ant_peer_id.0);
2387 let data = payload[32..].to_vec();
2388
2389 dm.register_agent(sender, machine_id).await;
2391
2392 dm.handle_incoming(machine_id, sender, data).await;
2394 }
2395 });
2396 }
2397
2398 pub async fn start_identity_heartbeat(&self) -> error::Result<()> {
2406 let mut handle_guard = self.heartbeat_handle.lock().await;
2407 if handle_guard.is_some() {
2408 return Ok(());
2409 }
2410 let Some(runtime) = self.gossip_runtime.as_ref().map(std::sync::Arc::clone) else {
2411 return Err(error::IdentityError::Storage(std::io::Error::other(
2412 "gossip runtime not initialized — cannot start heartbeat",
2413 )));
2414 };
2415 let Some(network) = self.network.as_ref().map(std::sync::Arc::clone) else {
2416 return Err(error::IdentityError::Storage(std::io::Error::other(
2417 "network not initialized — cannot start heartbeat",
2418 )));
2419 };
2420 let ctx = HeartbeatContext {
2421 identity: std::sync::Arc::clone(&self.identity),
2422 runtime,
2423 network,
2424 interval_secs: self.heartbeat_interval_secs,
2425 cache: std::sync::Arc::clone(&self.identity_discovery_cache),
2426 };
2427 let handle = tokio::task::spawn(async move {
2428 let mut ticker =
2429 tokio::time::interval(std::time::Duration::from_secs(ctx.interval_secs));
2430 ticker.tick().await; loop {
2432 ticker.tick().await;
2433 if let Err(e) = ctx.announce().await {
2434 tracing::warn!("identity heartbeat announce failed: {e}");
2435 }
2436 }
2437 });
2438 *handle_guard = Some(handle);
2439 Ok(())
2440 }
2441
2442 pub async fn advertise_identity(&self, validity_ms: u64) -> error::Result<()> {
2472 use saorsa_gossip_rendezvous::{Capability, ProviderSummary};
2473
2474 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2475 error::IdentityError::Storage(std::io::Error::other(
2476 "gossip runtime not initialized — cannot advertise identity",
2477 ))
2478 })?;
2479
2480 let peer_id = runtime.peer_id();
2481 let addresses = self.announcement_addresses();
2482 let addr_bytes = bincode::serialize(&addresses).map_err(|e| {
2483 error::IdentityError::Serialization(format!(
2484 "failed to serialize addresses for rendezvous: {e}"
2485 ))
2486 })?;
2487
2488 let mut summary = ProviderSummary::new(
2489 self.agent_id().0,
2490 peer_id,
2491 vec![Capability::Identity],
2492 validity_ms,
2493 )
2494 .with_extensions(addr_bytes);
2495
2496 summary
2497 .sign_raw(self.identity.machine_keypair().secret_key().as_bytes())
2498 .map_err(|e| {
2499 error::IdentityError::Storage(std::io::Error::other(format!(
2500 "failed to sign rendezvous summary: {e}"
2501 )))
2502 })?;
2503
2504 let cbor_bytes = summary.to_cbor().map_err(|e| {
2505 error::IdentityError::Serialization(format!(
2506 "failed to CBOR-encode rendezvous summary: {e}"
2507 ))
2508 })?;
2509
2510 let topic = rendezvous_shard_topic_for_agent(&self.agent_id());
2511 runtime
2512 .pubsub()
2513 .publish(topic, bytes::Bytes::from(cbor_bytes))
2514 .await
2515 .map_err(|e| {
2516 error::IdentityError::Storage(std::io::Error::other(format!(
2517 "failed to publish rendezvous summary: {e}"
2518 )))
2519 })?;
2520
2521 self.rendezvous_advertised
2522 .store(true, std::sync::atomic::Ordering::Relaxed);
2523 Ok(())
2524 }
2525
2526 pub async fn find_agent_rendezvous(
2539 &self,
2540 agent_id: identity::AgentId,
2541 timeout_secs: u64,
2542 ) -> error::Result<Option<Vec<std::net::SocketAddr>>> {
2543 use saorsa_gossip_rendezvous::ProviderSummary;
2544
2545 let runtime = match self.gossip_runtime.as_ref() {
2546 Some(r) => r,
2547 None => return Ok(None),
2548 };
2549
2550 let topic = rendezvous_shard_topic_for_agent(&agent_id);
2551 let mut sub = runtime.pubsub().subscribe(topic).await;
2552 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
2553
2554 loop {
2555 if tokio::time::Instant::now() >= deadline {
2556 break;
2557 }
2558 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
2559 tokio::select! {
2560 Some(msg) = sub.recv() => {
2561 let summary = match ProviderSummary::from_cbor(&msg.payload) {
2562 Ok(s) => s,
2563 Err(_) => continue,
2564 };
2565 if summary.target != agent_id.0 {
2566 continue;
2567 }
2568 let cached_pub = self
2574 .identity_discovery_cache
2575 .read()
2576 .await
2577 .get(&agent_id)
2578 .map(|e| e.machine_public_key.clone());
2579 if let Some(pub_bytes) = cached_pub {
2580 if !pub_bytes.is_empty()
2581 && !summary.verify_raw(&pub_bytes).unwrap_or(false)
2582 {
2583 tracing::warn!(
2584 "Rendezvous summary signature verification failed for agent {:?}; discarding",
2585 agent_id
2586 );
2587 continue;
2588 }
2589 }
2590 let addrs: Vec<std::net::SocketAddr> = summary
2592 .extensions
2593 .as_deref()
2594 .and_then(|b| {
2595 use bincode::Options;
2596 bincode::DefaultOptions::new()
2597 .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
2598 .deserialize(b)
2599 .ok()
2600 })
2601 .unwrap_or_default();
2602 if !addrs.is_empty() {
2603 return Ok(Some(addrs));
2604 }
2605 }
2606 _ = tokio::time::sleep(remaining) => break,
2607 }
2608 }
2609
2610 Ok(None)
2611 }
2612
2613 #[doc(hidden)]
2619 pub async fn insert_discovered_agent_for_testing(&self, agent: DiscoveredAgent) {
2620 self.identity_discovery_cache
2621 .write()
2622 .await
2623 .insert(agent.agent_id, agent);
2624 }
2625
2626 pub async fn create_task_list(&self, name: &str, topic: &str) -> error::Result<TaskListHandle> {
2650 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2651 error::IdentityError::Storage(std::io::Error::other(
2652 "gossip runtime not initialized - configure agent with network first",
2653 ))
2654 })?;
2655
2656 let peer_id = runtime.peer_id();
2657 let list_id = crdt::TaskListId::from_content(name, &self.agent_id(), 0);
2658 let task_list = crdt::TaskList::new(list_id, name.to_string(), peer_id);
2659
2660 let sync = crdt::TaskListSync::new(
2661 task_list,
2662 std::sync::Arc::clone(runtime.pubsub()),
2663 topic.to_string(),
2664 30,
2665 )
2666 .map_err(|e| {
2667 error::IdentityError::Storage(std::io::Error::other(format!(
2668 "task list sync creation failed: {}",
2669 e
2670 )))
2671 })?;
2672
2673 let sync = std::sync::Arc::new(sync);
2674 sync.start().await.map_err(|e| {
2675 error::IdentityError::Storage(std::io::Error::other(format!(
2676 "task list sync start failed: {}",
2677 e
2678 )))
2679 })?;
2680
2681 Ok(TaskListHandle {
2682 sync,
2683 agent_id: self.agent_id(),
2684 peer_id,
2685 })
2686 }
2687
2688 pub async fn join_task_list(&self, topic: &str) -> error::Result<TaskListHandle> {
2711 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2712 error::IdentityError::Storage(std::io::Error::other(
2713 "gossip runtime not initialized - configure agent with network first",
2714 ))
2715 })?;
2716
2717 let peer_id = runtime.peer_id();
2718 let list_id = crdt::TaskListId::from_content(topic, &self.agent_id(), 0);
2720 let task_list = crdt::TaskList::new(list_id, String::new(), peer_id);
2721
2722 let sync = crdt::TaskListSync::new(
2723 task_list,
2724 std::sync::Arc::clone(runtime.pubsub()),
2725 topic.to_string(),
2726 30,
2727 )
2728 .map_err(|e| {
2729 error::IdentityError::Storage(std::io::Error::other(format!(
2730 "task list sync creation failed: {}",
2731 e
2732 )))
2733 })?;
2734
2735 let sync = std::sync::Arc::new(sync);
2736 sync.start().await.map_err(|e| {
2737 error::IdentityError::Storage(std::io::Error::other(format!(
2738 "task list sync start failed: {}",
2739 e
2740 )))
2741 })?;
2742
2743 Ok(TaskListHandle {
2744 sync,
2745 agent_id: self.agent_id(),
2746 peer_id,
2747 })
2748 }
2749}
2750
2751impl AgentBuilder {
2752 pub fn with_machine_key<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2764 self.machine_key_path = Some(path.as_ref().to_path_buf());
2765 self
2766 }
2767
2768 pub fn with_agent_key(mut self, keypair: identity::AgentKeypair) -> Self {
2787 self.agent_keypair = Some(keypair);
2788 self
2789 }
2790
2791 pub fn with_agent_key_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2806 self.agent_key_path = Some(path.as_ref().to_path_buf());
2807 self
2808 }
2809
2810 pub fn with_network_config(mut self, config: network::NetworkConfig) -> Self {
2822 self.network_config = Some(config);
2823 self
2824 }
2825
2826 pub fn with_peer_cache_dir<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2832 self.peer_cache_dir = Some(path.as_ref().to_path_buf());
2833 self
2834 }
2835
2836 pub fn with_user_key(mut self, keypair: identity::UserKeypair) -> Self {
2854 self.user_keypair = Some(keypair);
2855 self
2856 }
2857
2858 pub fn with_user_key_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2874 self.user_key_path = Some(path.as_ref().to_path_buf());
2875 self
2876 }
2877
2878 #[must_use]
2886 pub fn with_heartbeat_interval(mut self, secs: u64) -> Self {
2887 self.heartbeat_interval_secs = Some(secs);
2888 self
2889 }
2890
2891 #[must_use]
2902 pub fn with_identity_ttl(mut self, secs: u64) -> Self {
2903 self.identity_ttl_secs = Some(secs);
2904 self
2905 }
2906
2907 #[must_use]
2916 pub fn with_contact_store_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2917 self.contact_store_path = Some(path.as_ref().to_path_buf());
2918 self
2919 }
2920
2921 pub async fn build(self) -> error::Result<Agent> {
2937 let machine_keypair = if let Some(path) = self.machine_key_path {
2939 match storage::load_machine_keypair_from(&path).await {
2941 Ok(kp) => kp,
2942 Err(_) => {
2943 let kp = identity::MachineKeypair::generate()?;
2945 storage::save_machine_keypair_to(&kp, &path).await?;
2946 kp
2947 }
2948 }
2949 } else if storage::machine_keypair_exists().await {
2950 storage::load_machine_keypair().await?
2952 } else {
2953 let kp = identity::MachineKeypair::generate()?;
2955 storage::save_machine_keypair(&kp).await?;
2956 kp
2957 };
2958
2959 let agent_keypair = if let Some(kp) = self.agent_keypair {
2961 kp
2963 } else if let Some(path) = self.agent_key_path {
2964 match storage::load_agent_keypair_from(&path).await {
2966 Ok(kp) => kp,
2967 Err(_) => {
2968 let kp = identity::AgentKeypair::generate()?;
2969 storage::save_agent_keypair_to(&kp, &path).await?;
2970 kp
2971 }
2972 }
2973 } else if storage::agent_keypair_exists().await {
2974 storage::load_agent_keypair_default().await?
2976 } else {
2977 let kp = identity::AgentKeypair::generate()?;
2979 storage::save_agent_keypair_default(&kp).await?;
2980 kp
2981 };
2982
2983 let user_keypair = if let Some(kp) = self.user_keypair {
2985 Some(kp)
2986 } else if let Some(path) = self.user_key_path {
2987 storage::load_user_keypair_from(&path).await.ok()
2989 } else if storage::user_keypair_exists().await {
2990 storage::load_user_keypair().await.ok()
2992 } else {
2993 None
2994 };
2995
2996 let identity = if let Some(user_kp) = user_keypair {
2998 let cert = if storage::agent_certificate_exists().await {
3001 match storage::load_agent_certificate().await {
3002 Ok(c) => {
3003 let cert_matches_user = c
3005 .user_id()
3006 .map(|uid| uid == user_kp.user_id())
3007 .unwrap_or(false);
3008 if cert_matches_user {
3009 c
3010 } else {
3011 let new_cert =
3013 identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
3014 storage::save_agent_certificate(&new_cert).await?;
3015 new_cert
3016 }
3017 }
3018 Err(_) => {
3019 let c = identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
3020 storage::save_agent_certificate(&c).await?;
3021 c
3022 }
3023 }
3024 } else {
3025 let c = identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
3026 storage::save_agent_certificate(&c).await?;
3027 c
3028 };
3029 identity::Identity::new_with_user(machine_keypair, agent_keypair, user_kp, cert)
3030 } else {
3031 identity::Identity::new(machine_keypair, agent_keypair)
3032 };
3033
3034 let bootstrap_cache = if self.network_config.is_some() {
3036 let cache_dir = self.peer_cache_dir.unwrap_or_else(|| {
3037 dirs::home_dir()
3038 .unwrap_or_else(|| std::path::PathBuf::from("."))
3039 .join(".x0x")
3040 .join("peers")
3041 });
3042 let config = ant_quic::BootstrapCacheConfig::builder()
3043 .cache_dir(cache_dir)
3044 .min_peers_to_save(1)
3045 .build();
3046 match ant_quic::BootstrapCache::open(config).await {
3047 Ok(cache) => {
3048 let cache = std::sync::Arc::new(cache);
3049 std::sync::Arc::clone(&cache).start_maintenance();
3050 Some(cache)
3051 }
3052 Err(e) => {
3053 tracing::warn!("Failed to open bootstrap cache: {e}");
3054 None
3055 }
3056 }
3057 } else {
3058 None
3059 };
3060
3061 let machine_keypair = {
3064 let pk = ant_quic::MlDsaPublicKey::from_bytes(
3065 identity.machine_keypair().public_key().as_bytes(),
3066 )
3067 .map_err(|e| {
3068 error::IdentityError::Storage(std::io::Error::other(format!(
3069 "invalid machine public key: {e}"
3070 )))
3071 })?;
3072 let sk = ant_quic::MlDsaSecretKey::from_bytes(
3073 identity.machine_keypair().secret_key().as_bytes(),
3074 )
3075 .map_err(|e| {
3076 error::IdentityError::Storage(std::io::Error::other(format!(
3077 "invalid machine secret key: {e}"
3078 )))
3079 })?;
3080 Some((pk, sk))
3081 };
3082
3083 let network = if let Some(config) = self.network_config {
3084 let node = network::NetworkNode::new(config, bootstrap_cache.clone(), machine_keypair)
3085 .await
3086 .map_err(|e| {
3087 error::IdentityError::Storage(std::io::Error::other(format!(
3088 "network initialization failed: {}",
3089 e
3090 )))
3091 })?;
3092
3093 debug_assert_eq!(
3095 node.peer_id().0,
3096 identity.machine_id().0,
3097 "ant-quic PeerId must equal MachineId after identity unification"
3098 );
3099
3100 Some(std::sync::Arc::new(node))
3101 } else {
3102 None
3103 };
3104
3105 let signing_ctx = std::sync::Arc::new(gossip::SigningContext::from_keypair(
3107 identity.agent_keypair(),
3108 ));
3109
3110 let gossip_runtime = if let Some(ref net) = network {
3112 let runtime = gossip::GossipRuntime::new(
3113 gossip::GossipConfig::default(),
3114 std::sync::Arc::clone(net),
3115 Some(signing_ctx),
3116 )
3117 .await
3118 .map_err(|e| {
3119 error::IdentityError::Storage(std::io::Error::other(format!(
3120 "gossip runtime initialization failed: {}",
3121 e
3122 )))
3123 })?;
3124 Some(std::sync::Arc::new(runtime))
3125 } else {
3126 None
3127 };
3128
3129 let contacts_path = self.contact_store_path.unwrap_or_else(|| {
3131 dirs::home_dir()
3132 .unwrap_or_else(|| std::path::PathBuf::from("."))
3133 .join(".x0x")
3134 .join("contacts.json")
3135 });
3136 let contact_store = std::sync::Arc::new(tokio::sync::RwLock::new(
3137 contacts::ContactStore::new(contacts_path),
3138 ));
3139
3140 let gossip_cache_adapter = bootstrap_cache.as_ref().map(|cache| {
3142 saorsa_gossip_coordinator::GossipCacheAdapter::new(std::sync::Arc::clone(cache))
3143 });
3144
3145 let direct_messaging = std::sync::Arc::new(direct::DirectMessaging::new());
3147
3148 let presence = if let Some(ref net) = network {
3150 let peer_id = saorsa_gossip_transport::GossipTransport::local_peer_id(net.as_ref());
3151 let pw = presence::PresenceWrapper::new(
3152 peer_id,
3153 std::sync::Arc::clone(net),
3154 presence::PresenceConfig::default(),
3155 bootstrap_cache.clone(),
3156 )
3157 .map_err(|e| {
3158 error::IdentityError::Storage(std::io::Error::other(format!(
3159 "presence initialization failed: {}",
3160 e
3161 )))
3162 })?;
3163 let pw_arc = std::sync::Arc::new(pw);
3164 if let Some(ref rt) = gossip_runtime {
3166 rt.set_presence(std::sync::Arc::clone(&pw_arc));
3167 }
3168 Some(pw_arc)
3169 } else {
3170 None
3171 };
3172
3173 Ok(Agent {
3174 identity: std::sync::Arc::new(identity),
3175 network,
3176 gossip_runtime,
3177 bootstrap_cache,
3178 gossip_cache_adapter,
3179 identity_discovery_cache: std::sync::Arc::new(tokio::sync::RwLock::new(
3180 std::collections::HashMap::new(),
3181 )),
3182 identity_listener_started: std::sync::atomic::AtomicBool::new(false),
3183 heartbeat_interval_secs: self
3184 .heartbeat_interval_secs
3185 .unwrap_or(IDENTITY_HEARTBEAT_INTERVAL_SECS),
3186 identity_ttl_secs: self.identity_ttl_secs.unwrap_or(IDENTITY_TTL_SECS),
3187 heartbeat_handle: tokio::sync::Mutex::new(None),
3188 rendezvous_advertised: std::sync::atomic::AtomicBool::new(false),
3189 contact_store,
3190 direct_messaging,
3191 direct_listener_started: std::sync::atomic::AtomicBool::new(false),
3192 presence,
3193 })
3194 }
3195}
3196
3197#[derive(Clone)]
3211pub struct TaskListHandle {
3212 sync: std::sync::Arc<crdt::TaskListSync>,
3213 agent_id: identity::AgentId,
3214 peer_id: saorsa_gossip_types::PeerId,
3215}
3216
3217impl std::fmt::Debug for TaskListHandle {
3218 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3219 f.debug_struct("TaskListHandle")
3220 .field("agent_id", &self.agent_id)
3221 .field("peer_id", &self.peer_id)
3222 .finish_non_exhaustive()
3223 }
3224}
3225
3226impl TaskListHandle {
3227 pub async fn add_task(
3242 &self,
3243 title: String,
3244 description: String,
3245 ) -> error::Result<crdt::TaskId> {
3246 let (task_id, delta) = {
3247 let mut list = self.sync.write().await;
3248 let seq = list.next_seq();
3249 let task_id = crdt::TaskId::new(&title, &self.agent_id, seq);
3250 let metadata = crdt::TaskMetadata::new(title, description, 128, self.agent_id, seq);
3251 let task = crdt::TaskItem::new(task_id, metadata, self.peer_id);
3252 list.add_task(task.clone(), self.peer_id, seq)
3253 .map_err(|e| {
3254 error::IdentityError::Storage(std::io::Error::other(format!(
3255 "add_task failed: {}",
3256 e
3257 )))
3258 })?;
3259 let tag = (self.peer_id, seq);
3260 let delta = crdt::TaskListDelta::for_add(task_id, task, tag, list.current_version());
3261 (task_id, delta)
3262 };
3263 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3265 tracing::warn!("failed to publish add_task delta: {}", e);
3266 }
3267 Ok(task_id)
3268 }
3269
3270 pub async fn claim_task(&self, task_id: crdt::TaskId) -> error::Result<()> {
3280 let delta = {
3281 let mut list = self.sync.write().await;
3282 let seq = list.next_seq();
3283 list.claim_task(&task_id, self.agent_id, self.peer_id, seq)
3284 .map_err(|e| {
3285 error::IdentityError::Storage(std::io::Error::other(format!(
3286 "claim_task failed: {}",
3287 e
3288 )))
3289 })?;
3290 let full_task = list
3292 .get_task(&task_id)
3293 .ok_or_else(|| {
3294 error::IdentityError::Storage(std::io::Error::other(
3295 "task disappeared after claim",
3296 ))
3297 })?
3298 .clone();
3299 crdt::TaskListDelta::for_state_change(task_id, full_task, list.current_version())
3300 };
3301 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3302 tracing::warn!("failed to publish claim_task delta: {}", e);
3303 }
3304 Ok(())
3305 }
3306
3307 pub async fn complete_task(&self, task_id: crdt::TaskId) -> error::Result<()> {
3317 let delta = {
3318 let mut list = self.sync.write().await;
3319 let seq = list.next_seq();
3320 list.complete_task(&task_id, self.agent_id, self.peer_id, seq)
3321 .map_err(|e| {
3322 error::IdentityError::Storage(std::io::Error::other(format!(
3323 "complete_task failed: {}",
3324 e
3325 )))
3326 })?;
3327 let full_task = list
3328 .get_task(&task_id)
3329 .ok_or_else(|| {
3330 error::IdentityError::Storage(std::io::Error::other(
3331 "task disappeared after complete",
3332 ))
3333 })?
3334 .clone();
3335 crdt::TaskListDelta::for_state_change(task_id, full_task, list.current_version())
3336 };
3337 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3338 tracing::warn!("failed to publish complete_task delta: {}", e);
3339 }
3340 Ok(())
3341 }
3342
3343 pub async fn list_tasks(&self) -> error::Result<Vec<TaskSnapshot>> {
3353 let list = self.sync.read().await;
3354 let tasks = list.tasks_ordered();
3355 Ok(tasks
3356 .into_iter()
3357 .map(|task| TaskSnapshot {
3358 id: *task.id(),
3359 title: task.title().to_string(),
3360 description: task.description().to_string(),
3361 state: task.current_state(),
3362 assignee: task.assignee().copied(),
3363 owner: None,
3364 priority: task.priority(),
3365 })
3366 .collect())
3367 }
3368
3369 pub async fn reorder(&self, task_ids: Vec<crdt::TaskId>) -> error::Result<()> {
3379 let delta = {
3380 let mut list = self.sync.write().await;
3381 list.reorder(task_ids.clone(), self.peer_id).map_err(|e| {
3382 error::IdentityError::Storage(std::io::Error::other(format!(
3383 "reorder failed: {}",
3384 e
3385 )))
3386 })?;
3387 crdt::TaskListDelta::for_reorder(task_ids, list.current_version())
3388 };
3389 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3390 tracing::warn!("failed to publish reorder delta: {}", e);
3391 }
3392 Ok(())
3393 }
3394}
3395
3396impl Agent {
3401 pub async fn create_kv_store(&self, name: &str, topic: &str) -> error::Result<KvStoreHandle> {
3410 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3411 error::IdentityError::Storage(std::io::Error::other(
3412 "gossip runtime not initialized - configure agent with network first",
3413 ))
3414 })?;
3415
3416 let peer_id = runtime.peer_id();
3417 let store_id = kv::KvStoreId::from_content(name, &self.agent_id());
3418 let store = kv::KvStore::new(
3419 store_id,
3420 name.to_string(),
3421 self.agent_id(),
3422 kv::AccessPolicy::Signed,
3423 );
3424
3425 let sync = kv::KvStoreSync::new(
3426 store,
3427 std::sync::Arc::clone(runtime.pubsub()),
3428 topic.to_string(),
3429 30,
3430 )
3431 .map_err(|e| {
3432 error::IdentityError::Storage(std::io::Error::other(format!(
3433 "kv store sync creation failed: {e}",
3434 )))
3435 })?;
3436
3437 let sync = std::sync::Arc::new(sync);
3438 sync.start().await.map_err(|e| {
3439 error::IdentityError::Storage(std::io::Error::other(format!(
3440 "kv store sync start failed: {e}",
3441 )))
3442 })?;
3443
3444 Ok(KvStoreHandle {
3445 sync,
3446 agent_id: self.agent_id(),
3447 peer_id,
3448 })
3449 }
3450
3451 pub async fn join_kv_store(&self, topic: &str) -> error::Result<KvStoreHandle> {
3461 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3462 error::IdentityError::Storage(std::io::Error::other(
3463 "gossip runtime not initialized - configure agent with network first",
3464 ))
3465 })?;
3466
3467 let peer_id = runtime.peer_id();
3468 let store_id = kv::KvStoreId::from_content(topic, &self.agent_id());
3469 let store = kv::KvStore::new(
3472 store_id,
3473 String::new(),
3474 self.agent_id(),
3475 kv::AccessPolicy::Encrypted {
3476 group_id: Vec::new(),
3477 },
3478 );
3479
3480 let sync = kv::KvStoreSync::new(
3481 store,
3482 std::sync::Arc::clone(runtime.pubsub()),
3483 topic.to_string(),
3484 30,
3485 )
3486 .map_err(|e| {
3487 error::IdentityError::Storage(std::io::Error::other(format!(
3488 "kv store sync creation failed: {e}",
3489 )))
3490 })?;
3491
3492 let sync = std::sync::Arc::new(sync);
3493 sync.start().await.map_err(|e| {
3494 error::IdentityError::Storage(std::io::Error::other(format!(
3495 "kv store sync start failed: {e}",
3496 )))
3497 })?;
3498
3499 Ok(KvStoreHandle {
3500 sync,
3501 agent_id: self.agent_id(),
3502 peer_id,
3503 })
3504 }
3505}
3506
3507#[derive(Clone)]
3512pub struct KvStoreHandle {
3513 sync: std::sync::Arc<kv::KvStoreSync>,
3514 agent_id: identity::AgentId,
3515 peer_id: saorsa_gossip_types::PeerId,
3516}
3517
3518impl std::fmt::Debug for KvStoreHandle {
3519 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3520 f.debug_struct("KvStoreHandle")
3521 .field("agent_id", &self.agent_id)
3522 .field("peer_id", &self.peer_id)
3523 .finish_non_exhaustive()
3524 }
3525}
3526
3527impl KvStoreHandle {
3528 pub async fn put(
3537 &self,
3538 key: String,
3539 value: Vec<u8>,
3540 content_type: String,
3541 ) -> error::Result<()> {
3542 let delta = {
3543 let mut store = self.sync.write().await;
3544 store
3545 .put(
3546 key.clone(),
3547 value.clone(),
3548 content_type.clone(),
3549 self.peer_id,
3550 )
3551 .map_err(|e| {
3552 error::IdentityError::Storage(std::io::Error::other(format!(
3553 "kv put failed: {e}",
3554 )))
3555 })?;
3556 let entry = store.get(&key).cloned();
3557 let version = store.current_version();
3558 match entry {
3559 Some(e) => {
3560 kv::KvStoreDelta::for_put(key, e, (self.peer_id, store.next_seq()), version)
3561 }
3562 None => return Ok(()), }
3564 };
3565 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3566 tracing::warn!("failed to publish kv put delta: {e}");
3567 }
3568 Ok(())
3569 }
3570
3571 pub async fn get(&self, key: &str) -> error::Result<Option<KvEntrySnapshot>> {
3579 let store = self.sync.read().await;
3580 Ok(store.get(key).map(|e| KvEntrySnapshot {
3581 key: e.key.clone(),
3582 value: e.value.clone(),
3583 content_hash: hex::encode(e.content_hash),
3584 content_type: e.content_type.clone(),
3585 metadata: e.metadata.clone(),
3586 created_at: e.created_at,
3587 updated_at: e.updated_at,
3588 }))
3589 }
3590
3591 pub async fn remove(&self, key: &str) -> error::Result<()> {
3597 let delta = {
3598 let mut store = self.sync.write().await;
3599 store.remove(key).map_err(|e| {
3600 error::IdentityError::Storage(std::io::Error::other(format!(
3601 "kv remove failed: {e}",
3602 )))
3603 })?;
3604 let mut d = kv::KvStoreDelta::new(store.current_version());
3605 d.removed
3606 .insert(key.to_string(), std::collections::HashSet::new());
3607 d
3608 };
3609 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3610 tracing::warn!("failed to publish kv remove delta: {e}");
3611 }
3612 Ok(())
3613 }
3614
3615 pub async fn keys(&self) -> error::Result<Vec<KvEntrySnapshot>> {
3621 let store = self.sync.read().await;
3622 Ok(store
3623 .active_entries()
3624 .into_iter()
3625 .map(|e| KvEntrySnapshot {
3626 key: e.key.clone(),
3627 value: e.value.clone(),
3628 content_hash: hex::encode(e.content_hash),
3629 content_type: e.content_type.clone(),
3630 metadata: e.metadata.clone(),
3631 created_at: e.created_at,
3632 updated_at: e.updated_at,
3633 })
3634 .collect())
3635 }
3636
3637 pub async fn name(&self) -> error::Result<String> {
3643 let store = self.sync.read().await;
3644 Ok(store.name().to_string())
3645 }
3646}
3647
3648#[derive(Debug, Clone, serde::Serialize)]
3650pub struct KvEntrySnapshot {
3651 pub key: String,
3653 pub value: Vec<u8>,
3655 pub content_hash: String,
3657 pub content_type: String,
3659 pub metadata: std::collections::HashMap<String, String>,
3661 pub created_at: u64,
3663 pub updated_at: u64,
3665}
3666
3667#[derive(Debug, Clone)]
3672pub struct TaskSnapshot {
3673 pub id: crdt::TaskId,
3675 pub title: String,
3677 pub description: String,
3679 pub state: crdt::CheckboxState,
3681 pub assignee: Option<identity::AgentId>,
3683 pub owner: Option<identity::UserId>,
3685 pub priority: u8,
3687}
3688
3689pub const VERSION: &str = env!("CARGO_PKG_VERSION");
3691
3692pub const NAME: &str = "x0x";
3694
3695#[cfg(test)]
3696mod tests {
3697 use super::*;
3698
3699 #[test]
3700 fn name_is_palindrome() {
3701 let name = NAME;
3702 let reversed: String = name.chars().rev().collect();
3703 assert_eq!(name, reversed, "x0x must be a palindrome");
3704 }
3705
3706 #[test]
3707 fn name_is_three_bytes() {
3708 assert_eq!(NAME.len(), 3, "x0x must be exactly three bytes");
3709 }
3710
3711 #[test]
3712 fn name_is_ai_native() {
3713 assert!(NAME.chars().all(|c| c.is_ascii_alphanumeric()));
3716 }
3717
3718 #[tokio::test]
3719 async fn agent_creates() {
3720 let agent = Agent::new().await;
3721 assert!(agent.is_ok());
3722 }
3723
3724 #[tokio::test]
3725 async fn agent_joins_network() {
3726 let agent = Agent::new().await.unwrap();
3727 assert!(agent.join_network().await.is_ok());
3728 }
3729
3730 #[tokio::test]
3731 async fn agent_subscribes() {
3732 let agent = Agent::new().await.unwrap();
3733 assert!(agent.subscribe("test-topic").await.is_err());
3735 }
3736
3737 #[tokio::test]
3738 async fn identity_announcement_machine_signature_verifies() {
3739 let agent = Agent::builder()
3740 .with_network_config(network::NetworkConfig::default())
3741 .build()
3742 .await
3743 .unwrap();
3744
3745 let announcement = agent.build_identity_announcement(false, false).unwrap();
3746 assert_eq!(announcement.agent_id, agent.agent_id());
3747 assert_eq!(announcement.machine_id, agent.machine_id());
3748 assert!(announcement.user_id.is_none());
3749 assert!(announcement.agent_certificate.is_none());
3750 assert!(announcement.verify().is_ok());
3751 }
3752
3753 #[tokio::test]
3754 async fn identity_announcement_requires_human_consent() {
3755 let agent = Agent::builder()
3756 .with_network_config(network::NetworkConfig::default())
3757 .build()
3758 .await
3759 .unwrap();
3760
3761 let err = agent.build_identity_announcement(true, false).unwrap_err();
3762 assert!(
3763 err.to_string().contains("explicit human consent"),
3764 "unexpected error: {err}"
3765 );
3766 }
3767
3768 #[tokio::test]
3769 async fn identity_announcement_with_user_requires_user_identity() {
3770 let agent = Agent::builder()
3771 .with_network_config(network::NetworkConfig::default())
3772 .build()
3773 .await
3774 .unwrap();
3775
3776 let err = agent.build_identity_announcement(true, true).unwrap_err();
3777 assert!(
3778 err.to_string().contains("no user identity is configured"),
3779 "unexpected error: {err}"
3780 );
3781 }
3782
3783 #[tokio::test]
3784 async fn announce_identity_populates_discovery_cache() {
3785 let user_key = identity::UserKeypair::generate().unwrap();
3786 let agent = Agent::builder()
3787 .with_network_config(network::NetworkConfig::default())
3788 .with_user_key(user_key)
3789 .build()
3790 .await
3791 .unwrap();
3792
3793 agent.announce_identity(true, true).await.unwrap();
3794 let discovered = agent.discovered_agent(agent.agent_id()).await.unwrap();
3795 let entry = discovered.expect("agent should discover its own announcement");
3796
3797 assert_eq!(entry.agent_id, agent.agent_id());
3798 assert_eq!(entry.machine_id, agent.machine_id());
3799 assert_eq!(entry.user_id, agent.user_id());
3800 }
3801
3802 #[test]
3806 fn identity_announcement_backward_compat_no_nat_fields() {
3807 use identity::{AgentId, MachineId};
3808
3809 #[derive(serde::Serialize, serde::Deserialize)]
3812 struct OldIdentityAnnouncementUnsigned {
3813 agent_id: AgentId,
3814 machine_id: MachineId,
3815 user_id: Option<identity::UserId>,
3816 agent_certificate: Option<identity::AgentCertificate>,
3817 machine_public_key: Vec<u8>,
3818 addresses: Vec<std::net::SocketAddr>,
3819 announced_at: u64,
3820 }
3821
3822 let agent_id = AgentId([1u8; 32]);
3823 let machine_id = MachineId([2u8; 32]);
3824 let old = OldIdentityAnnouncementUnsigned {
3825 agent_id,
3826 machine_id,
3827 user_id: None,
3828 agent_certificate: None,
3829 machine_public_key: vec![0u8; 10],
3830 addresses: Vec::new(),
3831 announced_at: 1234,
3832 };
3833 let bytes = bincode::serialize(&old).expect("serialize old announcement");
3834
3835 let result = bincode::deserialize::<IdentityAnnouncementUnsigned>(&bytes);
3843 assert!(
3846 result.is_err(),
3847 "Old-format announcement should not decode as new struct (protocol upgrade required)"
3848 );
3849 }
3850
3851 #[test]
3853 fn identity_announcement_nat_fields_round_trip() {
3854 use identity::{AgentId, MachineId};
3855
3856 let unsigned = IdentityAnnouncementUnsigned {
3857 agent_id: AgentId([1u8; 32]),
3858 machine_id: MachineId([2u8; 32]),
3859 user_id: None,
3860 agent_certificate: None,
3861 machine_public_key: vec![0u8; 10],
3862 addresses: Vec::new(),
3863 announced_at: 9999,
3864 nat_type: Some("FullCone".to_string()),
3865 can_receive_direct: Some(true),
3866 is_relay: Some(false),
3867 is_coordinator: Some(true),
3868 };
3869 let bytes = bincode::serialize(&unsigned).expect("serialize");
3870 let decoded: IdentityAnnouncementUnsigned =
3871 bincode::deserialize(&bytes).expect("deserialize");
3872 assert_eq!(decoded.nat_type.as_deref(), Some("FullCone"));
3873 assert_eq!(decoded.can_receive_direct, Some(true));
3874 assert_eq!(decoded.is_relay, Some(false));
3875 assert_eq!(decoded.is_coordinator, Some(true));
3876 }
3877
3878 #[test]
3881 fn identity_announcement_no_nat_fields_round_trip() {
3882 use identity::{AgentId, MachineId};
3883
3884 let unsigned = IdentityAnnouncementUnsigned {
3885 agent_id: AgentId([3u8; 32]),
3886 machine_id: MachineId([4u8; 32]),
3887 user_id: None,
3888 agent_certificate: None,
3889 machine_public_key: vec![0u8; 10],
3890 addresses: Vec::new(),
3891 announced_at: 42,
3892 nat_type: None,
3893 can_receive_direct: None,
3894 is_relay: None,
3895 is_coordinator: None,
3896 };
3897 let bytes = bincode::serialize(&unsigned).expect("serialize");
3898 let decoded: IdentityAnnouncementUnsigned =
3899 bincode::deserialize(&bytes).expect("deserialize");
3900 assert!(decoded.nat_type.is_none());
3901 assert!(decoded.can_receive_direct.is_none());
3902 assert!(decoded.is_relay.is_none());
3903 assert!(decoded.is_coordinator.is_none());
3904 }
3905}