1#![allow(clippy::unwrap_used)]
2#![allow(clippy::expect_used)]
3#![allow(missing_docs)]
4
5pub mod error;
55
56pub mod identity;
62
63pub mod storage;
68
69pub mod bootstrap;
74pub mod network;
76
77pub mod contacts;
79
80pub mod trust;
85
86pub mod connectivity;
91
92pub mod gossip;
94
95pub mod crdt;
97
98pub mod kv;
100
101pub mod groups;
103
104pub mod mls;
106
107pub mod direct;
112
113pub mod upgrade;
115
116pub mod files;
118
119pub mod api;
121
122pub mod cli;
124
125pub use gossip::{
127 GossipConfig, GossipRuntime, PubSubManager, PubSubMessage, SigningContext, Subscription,
128};
129
130pub use direct::{DirectMessage, DirectMessageReceiver, DirectMessaging};
132
133use saorsa_gossip_membership::Membership as _;
135
136#[derive(Debug)]
158pub struct Agent {
159 identity: std::sync::Arc<identity::Identity>,
160 #[allow(dead_code)]
162 network: Option<std::sync::Arc<network::NetworkNode>>,
163 gossip_runtime: Option<std::sync::Arc<gossip::GossipRuntime>>,
165 bootstrap_cache: Option<std::sync::Arc<ant_quic::BootstrapCache>>,
167 identity_discovery_cache: std::sync::Arc<
169 tokio::sync::RwLock<std::collections::HashMap<identity::AgentId, DiscoveredAgent>>,
170 >,
171 identity_listener_started: std::sync::atomic::AtomicBool,
173 heartbeat_interval_secs: u64,
175 identity_ttl_secs: u64,
177 heartbeat_handle: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
179 rendezvous_advertised: std::sync::atomic::AtomicBool,
181 contact_store: std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>>,
183 direct_messaging: std::sync::Arc<direct::DirectMessaging>,
185}
186
187#[derive(Debug, Clone)]
189pub struct Message {
190 pub origin: String,
192 pub payload: Vec<u8>,
194 pub topic: String,
196}
197
198pub const IDENTITY_ANNOUNCE_TOPIC: &str = "x0x.identity.announce.v1";
200
201#[must_use]
211pub fn shard_topic_for_agent(agent_id: &identity::AgentId) -> String {
212 let shard = saorsa_gossip_rendezvous::calculate_shard(&agent_id.0);
213 format!("x0x.identity.shard.{shard}")
214}
215
216pub const RENDEZVOUS_SHARD_TOPIC_PREFIX: &str = "x0x.rendezvous.shard";
218
219#[must_use]
225pub fn rendezvous_shard_topic_for_agent(agent_id: &identity::AgentId) -> String {
226 let shard = saorsa_gossip_rendezvous::calculate_shard(&agent_id.0);
227 format!("{RENDEZVOUS_SHARD_TOPIC_PREFIX}.{shard}")
228}
229
230pub const IDENTITY_HEARTBEAT_INTERVAL_SECS: u64 = 300;
232
233pub const IDENTITY_TTL_SECS: u64 = 900;
238
239#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
240struct IdentityAnnouncementUnsigned {
241 agent_id: identity::AgentId,
242 machine_id: identity::MachineId,
243 user_id: Option<identity::UserId>,
244 agent_certificate: Option<identity::AgentCertificate>,
245 machine_public_key: Vec<u8>,
246 addresses: Vec<std::net::SocketAddr>,
247 announced_at: u64,
248 nat_type: Option<String>,
250 can_receive_direct: Option<bool>,
252 is_relay: Option<bool>,
254 is_coordinator: Option<bool>,
256}
257
258#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
263pub struct IdentityAnnouncement {
264 pub agent_id: identity::AgentId,
266 pub machine_id: identity::MachineId,
268 pub user_id: Option<identity::UserId>,
270 pub agent_certificate: Option<identity::AgentCertificate>,
272 pub machine_public_key: Vec<u8>,
274 pub machine_signature: Vec<u8>,
276 pub addresses: Vec<std::net::SocketAddr>,
278 pub announced_at: u64,
280 pub nat_type: Option<String>,
283 pub can_receive_direct: Option<bool>,
286 pub is_relay: Option<bool>,
289 pub is_coordinator: Option<bool>,
292}
293
294impl IdentityAnnouncement {
295 fn to_unsigned(&self) -> IdentityAnnouncementUnsigned {
296 IdentityAnnouncementUnsigned {
297 agent_id: self.agent_id,
298 machine_id: self.machine_id,
299 user_id: self.user_id,
300 agent_certificate: self.agent_certificate.clone(),
301 machine_public_key: self.machine_public_key.clone(),
302 addresses: self.addresses.clone(),
303 announced_at: self.announced_at,
304 nat_type: self.nat_type.clone(),
305 can_receive_direct: self.can_receive_direct,
306 is_relay: self.is_relay,
307 is_coordinator: self.is_coordinator,
308 }
309 }
310
311 pub fn verify(&self) -> error::Result<()> {
313 let machine_pub =
314 ant_quic::MlDsaPublicKey::from_bytes(&self.machine_public_key).map_err(|_| {
315 error::IdentityError::CertificateVerification(
316 "invalid machine public key in announcement".to_string(),
317 )
318 })?;
319 let derived_machine_id = identity::MachineId::from_public_key(&machine_pub);
320 if derived_machine_id != self.machine_id {
321 return Err(error::IdentityError::CertificateVerification(
322 "machine_id does not match machine public key".to_string(),
323 ));
324 }
325
326 let unsigned_bytes = bincode::serialize(&self.to_unsigned()).map_err(|e| {
327 error::IdentityError::Serialization(format!(
328 "failed to serialize announcement for verification: {e}"
329 ))
330 })?;
331 let signature = ant_quic::crypto::raw_public_keys::pqc::MlDsaSignature::from_bytes(
332 &self.machine_signature,
333 )
334 .map_err(|e| {
335 error::IdentityError::CertificateVerification(format!(
336 "invalid machine signature in announcement: {:?}",
337 e
338 ))
339 })?;
340 ant_quic::crypto::raw_public_keys::pqc::verify_with_ml_dsa(
341 &machine_pub,
342 &unsigned_bytes,
343 &signature,
344 )
345 .map_err(|e| {
346 error::IdentityError::CertificateVerification(format!(
347 "machine signature verification failed: {:?}",
348 e
349 ))
350 })?;
351
352 match (self.user_id, self.agent_certificate.as_ref()) {
353 (Some(user_id), Some(cert)) => {
354 cert.verify()?;
355 let cert_agent_id = cert.agent_id()?;
356 if cert_agent_id != self.agent_id {
357 return Err(error::IdentityError::CertificateVerification(
358 "agent certificate agent_id mismatch".to_string(),
359 ));
360 }
361 let cert_user_id = cert.user_id()?;
362 if cert_user_id != user_id {
363 return Err(error::IdentityError::CertificateVerification(
364 "agent certificate user_id mismatch".to_string(),
365 ));
366 }
367 Ok(())
368 }
369 (None, None) => Ok(()),
370 _ => Err(error::IdentityError::CertificateVerification(
371 "user identity disclosure requires matching certificate".to_string(),
372 )),
373 }
374 }
375}
376
377#[derive(Debug, Clone)]
379pub struct DiscoveredAgent {
380 pub agent_id: identity::AgentId,
382 pub machine_id: identity::MachineId,
384 pub user_id: Option<identity::UserId>,
386 pub addresses: Vec<std::net::SocketAddr>,
388 pub announced_at: u64,
390 pub last_seen: u64,
392 #[doc(hidden)]
397 pub machine_public_key: Vec<u8>,
398 pub nat_type: Option<String>,
401 pub can_receive_direct: Option<bool>,
404 pub is_relay: Option<bool>,
407 pub is_coordinator: Option<bool>,
410}
411
412#[derive(Debug)]
449pub struct AgentBuilder {
450 machine_key_path: Option<std::path::PathBuf>,
451 agent_keypair: Option<identity::AgentKeypair>,
452 agent_key_path: Option<std::path::PathBuf>,
453 user_keypair: Option<identity::UserKeypair>,
454 user_key_path: Option<std::path::PathBuf>,
455 #[allow(dead_code)]
456 network_config: Option<network::NetworkConfig>,
457 peer_cache_dir: Option<std::path::PathBuf>,
458 heartbeat_interval_secs: Option<u64>,
459 identity_ttl_secs: Option<u64>,
460 contact_store_path: Option<std::path::PathBuf>,
462}
463
464struct HeartbeatContext {
466 identity: std::sync::Arc<identity::Identity>,
467 runtime: std::sync::Arc<gossip::GossipRuntime>,
468 network: std::sync::Arc<network::NetworkNode>,
469 interval_secs: u64,
470 cache: std::sync::Arc<
471 tokio::sync::RwLock<std::collections::HashMap<identity::AgentId, DiscoveredAgent>>,
472 >,
473}
474
475impl HeartbeatContext {
476 async fn announce(&self) -> error::Result<()> {
477 let machine_public_key = self
478 .identity
479 .machine_keypair()
480 .public_key()
481 .as_bytes()
482 .to_vec();
483 let announced_at = Agent::unix_timestamp_secs();
484
485 let mut addresses = match self.network.node_status().await {
488 Some(status) if !status.external_addrs.is_empty() => status.external_addrs,
489 _ => match self.network.routable_addr().await {
490 Some(addr) => vec![addr],
491 None => Vec::new(),
492 },
493 };
494
495 let port = addresses.first().map(|a| a.port()).unwrap_or(5483);
499 if let Ok(sock) = std::net::UdpSocket::bind("[::]:0") {
500 if sock.connect("[2001:4860:4860::8888]:80").is_ok() {
501 if let Ok(local) = sock.local_addr() {
502 if let std::net::IpAddr::V6(v6) = local.ip() {
503 let segs = v6.segments();
504 let is_global = (segs[0] & 0xffc0) != 0xfe80
505 && (segs[0] & 0xff00) != 0xfd00
506 && !v6.is_loopback();
507 if is_global {
508 let v6_addr = std::net::SocketAddr::new(std::net::IpAddr::V6(v6), port);
509 if !addresses.contains(&v6_addr) {
510 addresses.push(v6_addr);
511 }
512 }
513 }
514 }
515 }
516 }
517
518 let (nat_type, can_receive_direct, is_relay, is_coordinator) =
520 match self.network.node_status().await {
521 Some(status) => (
522 Some(status.nat_type.to_string()),
523 Some(status.can_receive_direct),
524 Some(status.is_relaying),
525 Some(status.is_coordinating),
526 ),
527 None => (None, None, None, None),
528 };
529
530 let unsigned = IdentityAnnouncementUnsigned {
531 agent_id: self.identity.agent_id(),
532 machine_id: self.identity.machine_id(),
533 user_id: self
534 .identity
535 .user_keypair()
536 .map(identity::UserKeypair::user_id),
537 agent_certificate: self.identity.agent_certificate().cloned(),
538 machine_public_key: machine_public_key.clone(),
539 addresses,
540 announced_at,
541 nat_type: nat_type.clone(),
542 can_receive_direct,
543 is_relay,
544 is_coordinator,
545 };
546 let unsigned_bytes = bincode::serialize(&unsigned).map_err(|e| {
547 error::IdentityError::Serialization(format!(
548 "heartbeat: failed to serialize announcement: {e}"
549 ))
550 })?;
551 let machine_signature = ant_quic::crypto::raw_public_keys::pqc::sign_with_ml_dsa(
552 self.identity.machine_keypair().secret_key(),
553 &unsigned_bytes,
554 )
555 .map_err(|e| {
556 error::IdentityError::Storage(std::io::Error::other(format!(
557 "heartbeat: failed to sign announcement: {:?}",
558 e
559 )))
560 })?
561 .as_bytes()
562 .to_vec();
563
564 let announcement = IdentityAnnouncement {
565 agent_id: unsigned.agent_id,
566 machine_id: unsigned.machine_id,
567 user_id: unsigned.user_id,
568 agent_certificate: unsigned.agent_certificate,
569 machine_public_key: machine_public_key.clone(),
570 machine_signature,
571 addresses: unsigned.addresses,
572 announced_at,
573 nat_type,
574 can_receive_direct,
575 is_relay,
576 is_coordinator,
577 };
578 let encoded = bincode::serialize(&announcement).map_err(|e| {
579 error::IdentityError::Serialization(format!(
580 "heartbeat: failed to serialize announcement: {e}"
581 ))
582 })?;
583 self.runtime
584 .pubsub()
585 .publish(
586 IDENTITY_ANNOUNCE_TOPIC.to_string(),
587 bytes::Bytes::from(encoded),
588 )
589 .await
590 .map_err(|e| {
591 error::IdentityError::Storage(std::io::Error::other(format!(
592 "heartbeat: publish failed: {e}"
593 )))
594 })?;
595 let now = Agent::unix_timestamp_secs();
596 self.cache.write().await.insert(
597 announcement.agent_id,
598 DiscoveredAgent {
599 agent_id: announcement.agent_id,
600 machine_id: announcement.machine_id,
601 user_id: announcement.user_id,
602 addresses: announcement.addresses,
603 announced_at: announcement.announced_at,
604 last_seen: now,
605 machine_public_key: machine_public_key.clone(),
606 nat_type: announcement.nat_type.clone(),
607 can_receive_direct: announcement.can_receive_direct,
608 is_relay: announcement.is_relay,
609 is_coordinator: announcement.is_coordinator,
610 },
611 );
612 Ok(())
613 }
614}
615
616impl Agent {
617 pub async fn new() -> error::Result<Self> {
624 Agent::builder().build().await
625 }
626
627 pub fn builder() -> AgentBuilder {
634 AgentBuilder {
635 machine_key_path: None,
636 agent_keypair: None,
637 agent_key_path: None,
638 user_keypair: None,
639 user_key_path: None,
640 network_config: None,
641 peer_cache_dir: None,
642 heartbeat_interval_secs: None,
643 identity_ttl_secs: None,
644 contact_store_path: None,
645 }
646 }
647
648 #[inline]
654 #[must_use]
655 pub fn identity(&self) -> &identity::Identity {
656 &self.identity
657 }
658
659 #[inline]
668 #[must_use]
669 pub fn machine_id(&self) -> identity::MachineId {
670 self.identity.machine_id()
671 }
672
673 #[inline]
683 #[must_use]
684 pub fn agent_id(&self) -> identity::AgentId {
685 self.identity.agent_id()
686 }
687
688 #[inline]
693 #[must_use]
694 pub fn user_id(&self) -> Option<identity::UserId> {
695 self.identity.user_id()
696 }
697
698 #[inline]
702 #[must_use]
703 pub fn agent_certificate(&self) -> Option<&identity::AgentCertificate> {
704 self.identity.agent_certificate()
705 }
706
707 #[must_use]
709 pub fn network(&self) -> Option<&std::sync::Arc<network::NetworkNode>> {
710 self.network.as_ref()
711 }
712
713 #[must_use]
721 pub fn contacts(&self) -> &std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>> {
722 &self.contact_store
723 }
724
725 pub async fn reachability(
731 &self,
732 agent_id: &identity::AgentId,
733 ) -> Option<connectivity::ReachabilityInfo> {
734 let cache = self.identity_discovery_cache.read().await;
735 cache
736 .get(agent_id)
737 .map(connectivity::ReachabilityInfo::from_discovered)
738 }
739
740 pub async fn connect_to_agent(
758 &self,
759 agent_id: &identity::AgentId,
760 ) -> error::Result<connectivity::ConnectOutcome> {
761 let discovered = {
763 let cache = self.identity_discovery_cache.read().await;
764 cache.get(agent_id).cloned()
765 };
766
767 let agent = match discovered {
768 Some(a) => a,
769 None => return Ok(connectivity::ConnectOutcome::NotFound),
770 };
771
772 let info = connectivity::ReachabilityInfo::from_discovered(&agent);
773
774 if info.addresses.is_empty() {
775 return Ok(connectivity::ConnectOutcome::Unreachable);
776 }
777
778 let Some(ref network) = self.network else {
779 return Ok(connectivity::ConnectOutcome::Unreachable);
780 };
781
782 if info.likely_direct() {
784 for addr in &info.addresses {
785 match network.connect_addr(*addr).await {
786 Ok(_peer_id) => {
787 if let Some(ref bc) = self.bootstrap_cache {
789 let peer_id = ant_quic::PeerId(agent.machine_id.0);
790 bc.add_from_connection(peer_id, vec![*addr], None).await;
791 }
792 self.direct_messaging
794 .mark_connected(agent.agent_id, agent.machine_id)
795 .await;
796 return Ok(connectivity::ConnectOutcome::Direct(*addr));
797 }
798 Err(e) => {
799 tracing::debug!("Direct connect to {} failed: {}", addr, e);
800 }
801 }
802 }
803 }
804
805 if info.needs_coordination() || !info.likely_direct() {
808 for addr in &info.addresses {
809 match network.connect_addr(*addr).await {
810 Ok(_peer_id) => {
811 if let Some(ref bc) = self.bootstrap_cache {
812 let peer_id = ant_quic::PeerId(agent.machine_id.0);
813 bc.add_from_connection(peer_id, vec![*addr], None).await;
814 }
815 self.direct_messaging
817 .mark_connected(agent.agent_id, agent.machine_id)
818 .await;
819 return Ok(connectivity::ConnectOutcome::Coordinated(*addr));
820 }
821 Err(e) => {
822 tracing::debug!("Coordinated connect to {} failed: {}", addr, e);
823 }
824 }
825 }
826 }
827
828 Ok(connectivity::ConnectOutcome::Unreachable)
829 }
830
831 pub async fn shutdown(&self) {
837 if let Some(ref cache) = self.bootstrap_cache {
838 if let Err(e) = cache.save().await {
839 tracing::warn!("Failed to save bootstrap cache on shutdown: {e}");
840 } else {
841 tracing::info!("Bootstrap cache saved on shutdown");
842 }
843 }
844 }
845
846 pub async fn send_direct(
876 &self,
877 agent_id: &identity::AgentId,
878 payload: Vec<u8>,
879 ) -> error::NetworkResult<()> {
880 let network = self.network.as_ref().ok_or_else(|| {
881 error::NetworkError::NodeCreation("network not initialized".to_string())
882 })?;
883
884 let machine_id = {
886 let cache = self.identity_discovery_cache.read().await;
887 cache.get(agent_id).map(|d| d.machine_id)
888 }
889 .ok_or(error::NetworkError::AgentNotFound(agent_id.0))?;
890
891 let ant_peer_id = ant_quic::PeerId(machine_id.0);
893 if !network.is_connected(&ant_peer_id).await {
894 return Err(error::NetworkError::AgentNotConnected(agent_id.0));
895 }
896
897 network
899 .send_direct(&ant_peer_id, &self.identity.agent_id().0, &payload)
900 .await?;
901
902 tracing::info!(
903 "Sent {} bytes directly to agent {:?}",
904 payload.len(),
905 agent_id
906 );
907
908 Ok(())
909 }
910
911 pub async fn recv_direct(&self) -> Option<direct::DirectMessage> {
936 self.recv_direct_inner().await
937 }
938
939 pub async fn recv_direct_filtered(&self) -> Option<direct::DirectMessage> {
968 loop {
969 let msg = self.recv_direct_inner().await?;
970
971 let contacts = self.contact_store.read().await;
973 if let Some(contact) = contacts.get(&msg.sender) {
974 if contact.trust_level == contacts::TrustLevel::Blocked {
975 tracing::debug!(
976 "Dropping direct message from blocked agent {:?}",
977 msg.sender
978 );
979 continue;
980 }
981 }
982
983 return Some(msg);
984 }
985 }
986
987 async fn recv_direct_inner(&self) -> Option<direct::DirectMessage> {
989 let network = self.network.as_ref()?;
990
991 let (ant_peer_id, payload) = network.recv_direct().await?;
993
994 if payload.len() < 32 {
996 tracing::warn!("Direct message too short to contain sender agent_id");
997 return None;
998 }
999
1000 let mut sender_bytes = [0u8; 32];
1001 sender_bytes.copy_from_slice(&payload[..32]);
1002 let sender = identity::AgentId(sender_bytes);
1003 let machine_id = identity::MachineId(ant_peer_id.0);
1004 let data = payload[32..].to_vec();
1005
1006 self.direct_messaging
1008 .register_agent(sender, machine_id)
1009 .await;
1010
1011 Some(direct::DirectMessage::new(sender, machine_id, data))
1012 }
1013
1014 pub fn subscribe_direct(&self) -> direct::DirectMessageReceiver {
1030 self.direct_messaging.subscribe()
1031 }
1032
1033 pub fn direct_messaging(&self) -> &std::sync::Arc<direct::DirectMessaging> {
1037 &self.direct_messaging
1038 }
1039
1040 pub async fn is_agent_connected(&self, agent_id: &identity::AgentId) -> bool {
1050 let Some(network) = &self.network else {
1051 return false;
1052 };
1053
1054 let machine_id = {
1056 let cache = self.identity_discovery_cache.read().await;
1057 cache.get(agent_id).map(|d| d.machine_id)
1058 };
1059
1060 match machine_id {
1061 Some(mid) => {
1062 let ant_peer_id = ant_quic::PeerId(mid.0);
1063 network.is_connected(&ant_peer_id).await
1064 }
1065 None => false,
1066 }
1067 }
1068
1069 pub async fn connected_agents(&self) -> Vec<identity::AgentId> {
1074 let Some(network) = &self.network else {
1075 return Vec::new();
1076 };
1077
1078 let connected_peers = network.connected_peers().await;
1079 let cache = self.identity_discovery_cache.read().await;
1080
1081 cache
1083 .values()
1084 .filter(|agent| {
1085 let ant_peer_id = ant_quic::PeerId(agent.machine_id.0);
1086 connected_peers.contains(&ant_peer_id)
1087 })
1088 .map(|agent| agent.agent_id)
1089 .collect()
1090 }
1091
1092 pub fn set_contacts(&self, store: std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>>) {
1100 if let Some(runtime) = &self.gossip_runtime {
1101 runtime.pubsub().set_contacts(store);
1102 }
1103 }
1104
1105 pub async fn announce_identity(
1123 &self,
1124 include_user_identity: bool,
1125 human_consent: bool,
1126 ) -> error::Result<()> {
1127 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1128 error::IdentityError::Storage(std::io::Error::other(
1129 "gossip runtime not initialized - configure agent with network first",
1130 ))
1131 })?;
1132
1133 self.start_identity_listener().await?;
1134
1135 let mut addresses = if let Some(network) = self.network.as_ref() {
1137 match network.node_status().await {
1138 Some(status) if !status.external_addrs.is_empty() => status.external_addrs,
1139 _ => match network.routable_addr().await {
1140 Some(addr) => vec![addr],
1141 None => self.announcement_addresses(),
1142 },
1143 }
1144 } else {
1145 self.announcement_addresses()
1146 };
1147 let port = addresses.first().map(|a| a.port()).unwrap_or(5483);
1151
1152 if let Ok(sock) = std::net::UdpSocket::bind("[::]:0") {
1154 if sock.connect("[2001:4860:4860::8888]:80").is_ok() {
1155 if let Ok(local) = sock.local_addr() {
1156 if let std::net::IpAddr::V6(v6) = local.ip() {
1157 let segs = v6.segments();
1158 let is_global = (segs[0] & 0xffc0) != 0xfe80
1159 && (segs[0] & 0xff00) != 0xfd00
1160 && !v6.is_loopback();
1161 if is_global {
1162 let v6_addr = std::net::SocketAddr::new(std::net::IpAddr::V6(v6), port);
1163 if !addresses.contains(&v6_addr) {
1164 addresses.push(v6_addr);
1165 }
1166 }
1167 }
1168 }
1169 }
1170 }
1171 let announcement = self.build_identity_announcement_with_addrs(
1172 include_user_identity,
1173 human_consent,
1174 addresses,
1175 )?;
1176
1177 let encoded = bincode::serialize(&announcement).map_err(|e| {
1178 error::IdentityError::Serialization(format!(
1179 "failed to serialize identity announcement: {e}"
1180 ))
1181 })?;
1182
1183 let payload = bytes::Bytes::from(encoded);
1184
1185 let shard_topic = shard_topic_for_agent(&announcement.agent_id);
1187 runtime
1188 .pubsub()
1189 .publish(shard_topic, payload.clone())
1190 .await
1191 .map_err(|e| {
1192 error::IdentityError::Storage(std::io::Error::other(format!(
1193 "failed to publish identity announcement to shard topic: {e}"
1194 )))
1195 })?;
1196
1197 runtime
1199 .pubsub()
1200 .publish(IDENTITY_ANNOUNCE_TOPIC.to_string(), payload)
1201 .await
1202 .map_err(|e| {
1203 error::IdentityError::Storage(std::io::Error::other(format!(
1204 "failed to publish identity announcement: {e}"
1205 )))
1206 })?;
1207
1208 let now = Self::unix_timestamp_secs();
1209 self.identity_discovery_cache.write().await.insert(
1210 announcement.agent_id,
1211 DiscoveredAgent {
1212 agent_id: announcement.agent_id,
1213 machine_id: announcement.machine_id,
1214 user_id: announcement.user_id,
1215 addresses: announcement.addresses.clone(),
1216 announced_at: announcement.announced_at,
1217 last_seen: now,
1218 machine_public_key: announcement.machine_public_key.clone(),
1219 nat_type: announcement.nat_type.clone(),
1220 can_receive_direct: announcement.can_receive_direct,
1221 is_relay: announcement.is_relay,
1222 is_coordinator: announcement.is_coordinator,
1223 },
1224 );
1225
1226 Ok(())
1227 }
1228
1229 pub async fn discovered_agents(&self) -> error::Result<Vec<DiscoveredAgent>> {
1235 self.start_identity_listener().await?;
1236 let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
1237 let mut agents: Vec<_> = self
1238 .identity_discovery_cache
1239 .read()
1240 .await
1241 .values()
1242 .filter(|a| a.announced_at >= cutoff)
1243 .cloned()
1244 .collect();
1245 agents.sort_by(|a, b| a.agent_id.0.cmp(&b.agent_id.0));
1246 Ok(agents)
1247 }
1248
1249 pub async fn discovered_agents_unfiltered(&self) -> error::Result<Vec<DiscoveredAgent>> {
1258 self.start_identity_listener().await?;
1259 let mut agents: Vec<_> = self
1260 .identity_discovery_cache
1261 .read()
1262 .await
1263 .values()
1264 .cloned()
1265 .collect();
1266 agents.sort_by(|a, b| a.agent_id.0.cmp(&b.agent_id.0));
1267 Ok(agents)
1268 }
1269
1270 pub async fn discovered_agent(
1276 &self,
1277 agent_id: identity::AgentId,
1278 ) -> error::Result<Option<DiscoveredAgent>> {
1279 self.start_identity_listener().await?;
1280 Ok(self
1281 .identity_discovery_cache
1282 .read()
1283 .await
1284 .get(&agent_id)
1285 .cloned())
1286 }
1287
1288 async fn start_identity_listener(&self) -> error::Result<()> {
1289 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1290 error::IdentityError::Storage(std::io::Error::other(
1291 "gossip runtime not initialized - configure agent with network first",
1292 ))
1293 })?;
1294
1295 if self
1296 .identity_listener_started
1297 .swap(true, std::sync::atomic::Ordering::AcqRel)
1298 {
1299 return Ok(());
1300 }
1301
1302 let mut sub_legacy = runtime
1303 .pubsub()
1304 .subscribe(IDENTITY_ANNOUNCE_TOPIC.to_string())
1305 .await;
1306 let own_shard_topic = shard_topic_for_agent(&self.agent_id());
1307 let mut sub_shard = runtime.pubsub().subscribe(own_shard_topic).await;
1308 let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
1309 let bootstrap_cache = self.bootstrap_cache.clone();
1310 let contact_store = std::sync::Arc::clone(&self.contact_store);
1311 let network = self.network.as_ref().map(std::sync::Arc::clone);
1312 let own_agent_id = self.agent_id();
1313
1314 tokio::spawn(async move {
1315 let mut auto_connect_attempted = std::collections::HashSet::<identity::AgentId>::new();
1318
1319 loop {
1320 let msg = tokio::select! {
1322 Some(m) = sub_legacy.recv() => m,
1323 Some(m) = sub_shard.recv() => m,
1324 else => break,
1325 };
1326 let decoded = {
1327 use bincode::Options;
1328 bincode::options()
1329 .with_fixint_encoding()
1330 .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
1331 .allow_trailing_bytes()
1332 .deserialize::<IdentityAnnouncement>(&msg.payload)
1333 };
1334 let announcement = match decoded {
1335 Ok(a) => a,
1336 Err(e) => {
1337 tracing::debug!("Ignoring invalid identity announcement payload: {}", e);
1338 continue;
1339 }
1340 };
1341
1342 if let Err(e) = announcement.verify() {
1343 tracing::warn!("Ignoring unverifiable identity announcement: {}", e);
1344 continue;
1345 }
1346
1347 {
1350 let store = contact_store.read().await;
1351 let evaluator = trust::TrustEvaluator::new(&store);
1352 let decision = evaluator.evaluate(&trust::TrustContext {
1353 agent_id: &announcement.agent_id,
1354 machine_id: &announcement.machine_id,
1355 });
1356 match decision {
1357 trust::TrustDecision::RejectBlocked => {
1358 tracing::debug!(
1359 "Dropping identity announcement from blocked agent {:?}",
1360 hex::encode(&announcement.agent_id.0[..8]),
1361 );
1362 continue;
1363 }
1364 trust::TrustDecision::RejectMachineMismatch => {
1365 tracing::warn!(
1366 "Dropping identity announcement from agent {:?}: machine {:?} not in pinned list",
1367 hex::encode(&announcement.agent_id.0[..8]),
1368 hex::encode(&announcement.machine_id.0[..8]),
1369 );
1370 continue;
1371 }
1372 _ => {}
1373 }
1374 }
1375
1376 {
1378 let mut store = contact_store.write().await;
1379 let record = contacts::MachineRecord::new(announcement.machine_id, None);
1380 store.add_machine(&announcement.agent_id, record);
1381 }
1382
1383 let now = std::time::SystemTime::now()
1384 .duration_since(std::time::UNIX_EPOCH)
1385 .map_or(0, |d| d.as_secs());
1386
1387 if !announcement.addresses.is_empty() {
1391 if let Some(ref bc) = &bootstrap_cache {
1392 let peer_id = ant_quic::PeerId(announcement.machine_id.0);
1393 bc.add_from_connection(peer_id, announcement.addresses.clone(), None)
1394 .await;
1395 tracing::debug!(
1396 "Added {} addresses from identity announcement to bootstrap cache for agent {:?} (machine {:?})",
1397 announcement.addresses.len(),
1398 announcement.agent_id,
1399 hex::encode(&announcement.machine_id.0[..8]),
1400 );
1401 }
1402 }
1403
1404 cache.write().await.insert(
1405 announcement.agent_id,
1406 DiscoveredAgent {
1407 agent_id: announcement.agent_id,
1408 machine_id: announcement.machine_id,
1409 user_id: announcement.user_id,
1410 addresses: announcement.addresses.clone(),
1411 announced_at: announcement.announced_at,
1412 last_seen: now,
1413 machine_public_key: announcement.machine_public_key.clone(),
1414 nat_type: announcement.nat_type.clone(),
1415 can_receive_direct: announcement.can_receive_direct,
1416 is_relay: announcement.is_relay,
1417 is_coordinator: announcement.is_coordinator,
1418 },
1419 );
1420
1421 if announcement.agent_id != own_agent_id
1426 && !announcement.addresses.is_empty()
1427 && !auto_connect_attempted.contains(&announcement.agent_id)
1428 {
1429 if let Some(ref net) = &network {
1430 let ant_peer = ant_quic::PeerId(announcement.machine_id.0);
1431 if !net.is_connected(&ant_peer).await {
1432 auto_connect_attempted.insert(announcement.agent_id);
1433 let net = std::sync::Arc::clone(net);
1434 let addresses = announcement.addresses.clone();
1435 tokio::spawn(async move {
1436 for addr in &addresses {
1437 match net.connect_addr(*addr).await {
1438 Ok(_) => {
1439 tracing::info!(
1440 "Auto-connected to discovered agent at {addr}",
1441 );
1442 return;
1443 }
1444 Err(e) => {
1445 tracing::debug!("Auto-connect to {addr} failed: {e}",);
1446 }
1447 }
1448 }
1449 tracing::debug!(
1450 "Auto-connect exhausted all {} addresses for discovered agent",
1451 addresses.len(),
1452 );
1453 });
1454 }
1455 }
1456 }
1457 }
1458 });
1459
1460 Ok(())
1461 }
1462
1463 fn unix_timestamp_secs() -> u64 {
1464 std::time::SystemTime::now()
1465 .duration_since(std::time::UNIX_EPOCH)
1466 .map_or(0, |d| d.as_secs())
1467 }
1468
1469 fn announcement_addresses(&self) -> Vec<std::net::SocketAddr> {
1470 match self.network.as_ref().and_then(|n| n.local_addr()) {
1473 Some(addr) if addr.port() > 0 && !addr.ip().is_unspecified() => vec![addr],
1474 _ => Vec::new(),
1475 }
1476 }
1477
1478 fn build_identity_announcement(
1479 &self,
1480 include_user_identity: bool,
1481 human_consent: bool,
1482 ) -> error::Result<IdentityAnnouncement> {
1483 self.build_identity_announcement_with_addrs(
1484 include_user_identity,
1485 human_consent,
1486 self.announcement_addresses(),
1487 )
1488 }
1489
1490 fn build_identity_announcement_with_addrs(
1491 &self,
1492 include_user_identity: bool,
1493 human_consent: bool,
1494 addresses: Vec<std::net::SocketAddr>,
1495 ) -> error::Result<IdentityAnnouncement> {
1496 if include_user_identity && !human_consent {
1497 return Err(error::IdentityError::Storage(std::io::Error::other(
1498 "human identity disclosure requires explicit human consent — set human_consent: true in the request body",
1499 )));
1500 }
1501
1502 let (user_id, agent_certificate) = if include_user_identity {
1503 let user_id = self.user_id().ok_or_else(|| {
1504 error::IdentityError::Storage(std::io::Error::other(
1505 "human identity disclosure requested but no user identity is configured — set user_key_path in your config.toml to point at your user keypair file",
1506 ))
1507 })?;
1508 let cert = self.agent_certificate().cloned().ok_or_else(|| {
1509 error::IdentityError::Storage(std::io::Error::other(
1510 "human identity disclosure requested but agent certificate is missing",
1511 ))
1512 })?;
1513 (Some(user_id), Some(cert))
1514 } else {
1515 (None, None)
1516 };
1517
1518 let machine_public_key = self
1519 .identity
1520 .machine_keypair()
1521 .public_key()
1522 .as_bytes()
1523 .to_vec();
1524
1525 let unsigned = IdentityAnnouncementUnsigned {
1529 agent_id: self.agent_id(),
1530 machine_id: self.machine_id(),
1531 user_id,
1532 agent_certificate: agent_certificate.clone(),
1533 machine_public_key: machine_public_key.clone(),
1534 addresses,
1535 announced_at: Self::unix_timestamp_secs(),
1536 nat_type: None,
1537 can_receive_direct: None,
1538 is_relay: None,
1539 is_coordinator: None,
1540 };
1541 let unsigned_bytes = bincode::serialize(&unsigned).map_err(|e| {
1542 error::IdentityError::Serialization(format!(
1543 "failed to serialize unsigned identity announcement: {e}"
1544 ))
1545 })?;
1546 let machine_signature = ant_quic::crypto::raw_public_keys::pqc::sign_with_ml_dsa(
1547 self.identity.machine_keypair().secret_key(),
1548 &unsigned_bytes,
1549 )
1550 .map_err(|e| {
1551 error::IdentityError::Storage(std::io::Error::other(format!(
1552 "failed to sign identity announcement with machine key: {:?}",
1553 e
1554 )))
1555 })?
1556 .as_bytes()
1557 .to_vec();
1558
1559 Ok(IdentityAnnouncement {
1560 agent_id: unsigned.agent_id,
1561 machine_id: unsigned.machine_id,
1562 user_id: unsigned.user_id,
1563 agent_certificate: unsigned.agent_certificate,
1564 machine_public_key,
1565 machine_signature,
1566 addresses: unsigned.addresses,
1567 announced_at: unsigned.announced_at,
1568 nat_type: unsigned.nat_type,
1569 can_receive_direct: unsigned.can_receive_direct,
1570 is_relay: unsigned.is_relay,
1571 is_coordinator: unsigned.is_coordinator,
1572 })
1573 }
1574
1575 pub async fn join_network(&self) -> error::Result<()> {
1584 let Some(network) = self.network.as_ref() else {
1585 tracing::debug!("join_network called but no network configured");
1586 return Ok(());
1587 };
1588
1589 if let Some(ref runtime) = self.gossip_runtime {
1590 runtime.start().await.map_err(|e| {
1591 error::IdentityError::Storage(std::io::Error::other(format!(
1592 "failed to start gossip runtime: {e}"
1593 )))
1594 })?;
1595 tracing::info!("Gossip runtime started");
1596 }
1597 self.start_identity_listener().await?;
1598
1599 let bootstrap_nodes = network.config().bootstrap_nodes.clone();
1600 if bootstrap_nodes.is_empty() {
1601 tracing::debug!("No bootstrap peers configured");
1602 if let Err(e) = self.announce_identity(false, false).await {
1603 tracing::warn!("Initial identity announcement failed: {}", e);
1604 }
1605 if let Err(e) = self.start_identity_heartbeat().await {
1606 tracing::warn!("Failed to start identity heartbeat: {e}");
1607 }
1608 return Ok(());
1609 }
1610
1611 let min_connected = 3;
1612 let mut all_connected: Vec<std::net::SocketAddr> = Vec::new();
1613
1614 if let Some(ref cache) = self.bootstrap_cache {
1616 const PHASE1_PEER_CANDIDATES: usize = 12;
1617 let cached_peers = cache.select_peers(PHASE1_PEER_CANDIDATES).await;
1618 if !cached_peers.is_empty() {
1619 tracing::info!("Phase 1: Trying {} cached peers", cached_peers.len());
1620 let (succeeded, _failed) = self
1621 .connect_cached_peers_parallel_tracked(network, &cached_peers)
1622 .await;
1623 all_connected.extend(&succeeded);
1624 tracing::info!(
1625 "Phase 1: {}/{} cached peers connected",
1626 succeeded.len(),
1627 cached_peers.len()
1628 );
1629 }
1630 }
1631
1632 if all_connected.len() < min_connected {
1634 let remaining: Vec<std::net::SocketAddr> = bootstrap_nodes
1635 .iter()
1636 .filter(|addr| !all_connected.contains(addr))
1637 .copied()
1638 .collect();
1639
1640 let (succeeded, mut failed) = self
1642 .connect_peers_parallel_tracked(network, &remaining)
1643 .await;
1644 all_connected.extend(&succeeded);
1645 tracing::info!(
1646 "Phase 2 round 1: {}/{} bootstrap peers connected",
1647 succeeded.len(),
1648 remaining.len()
1649 );
1650
1651 for round in 2..=3 {
1653 if failed.is_empty() {
1654 break;
1655 }
1656 let delay = std::time::Duration::from_secs(if round == 2 { 10 } else { 15 });
1657 tracing::info!(
1658 "Retrying {} failed peers in {}s (round {})",
1659 failed.len(),
1660 delay.as_secs(),
1661 round
1662 );
1663 tokio::time::sleep(delay).await;
1664
1665 let (succeeded, still_failed) =
1666 self.connect_peers_parallel_tracked(network, &failed).await;
1667 all_connected.extend(&succeeded);
1668 failed = still_failed;
1669 tracing::info!(
1670 "Phase 2 round {}: {} total peers connected",
1671 round,
1672 all_connected.len()
1673 );
1674 }
1675
1676 if !failed.is_empty() {
1677 tracing::warn!(
1678 "Could not connect to {} bootstrap peers: {:?}",
1679 failed.len(),
1680 failed
1681 );
1682 }
1683 }
1684
1685 tracing::info!(
1686 "Network join complete. Connected to {} peers.",
1687 all_connected.len()
1688 );
1689
1690 if let Some(ref runtime) = self.gossip_runtime {
1692 let seeds: Vec<String> = all_connected.iter().map(|addr| addr.to_string()).collect();
1693 if !seeds.is_empty() {
1694 if let Err(e) = runtime.membership().join(seeds).await {
1695 tracing::warn!("HyParView membership join failed: {e}");
1696 }
1697 }
1698 }
1699
1700 if let Err(e) = self.announce_identity(false, false).await {
1701 tracing::warn!("Initial identity announcement failed: {}", e);
1702 }
1703 if let Err(e) = self.start_identity_heartbeat().await {
1704 tracing::warn!("Failed to start identity heartbeat: {e}");
1705 }
1706
1707 Ok(())
1708 }
1709
1710 async fn connect_cached_peers_parallel_tracked(
1712 &self,
1713 network: &std::sync::Arc<network::NetworkNode>,
1714 peers: &[ant_quic::CachedPeer],
1715 ) -> (Vec<std::net::SocketAddr>, Vec<ant_quic::PeerId>) {
1716 let handles: Vec<_> = peers
1717 .iter()
1718 .map(|peer| {
1719 let net = network.clone();
1720 let peer_id = peer.peer_id;
1721 tokio::spawn(async move {
1722 tracing::debug!("Connecting to cached peer: {:?}", peer_id);
1723 match net.connect_cached_peer(peer_id).await {
1724 Ok(addr) => {
1725 tracing::info!("Connected to cached peer {:?} at {}", peer_id, addr);
1726 Ok(addr)
1727 }
1728 Err(e) => {
1729 tracing::warn!("Failed to connect to cached peer {:?}: {}", peer_id, e);
1730 Err(peer_id)
1731 }
1732 }
1733 })
1734 })
1735 .collect();
1736
1737 let mut succeeded = Vec::new();
1738 let mut failed = Vec::new();
1739 for handle in handles {
1740 match handle.await {
1741 Ok(Ok(addr)) => succeeded.push(addr),
1742 Ok(Err(peer_id)) => failed.push(peer_id),
1743 Err(e) => tracing::error!("Connection task panicked: {}", e),
1744 }
1745 }
1746 (succeeded, failed)
1747 }
1748
1749 async fn connect_peers_parallel_tracked(
1751 &self,
1752 network: &std::sync::Arc<network::NetworkNode>,
1753 addrs: &[std::net::SocketAddr],
1754 ) -> (Vec<std::net::SocketAddr>, Vec<std::net::SocketAddr>) {
1755 let handles: Vec<_> = addrs
1756 .iter()
1757 .map(|addr| {
1758 let net = network.clone();
1759 let addr = *addr;
1760 tokio::spawn(async move {
1761 tracing::debug!("Connecting to peer: {}", addr);
1762 match net.connect_addr(addr).await {
1763 Ok(_) => {
1764 tracing::info!("Connected to peer: {}", addr);
1765 Ok(addr)
1766 }
1767 Err(e) => {
1768 tracing::warn!("Failed to connect to {}: {}", addr, e);
1769 Err(addr)
1770 }
1771 }
1772 })
1773 })
1774 .collect();
1775
1776 let mut succeeded = Vec::new();
1777 let mut failed = Vec::new();
1778 for handle in handles {
1779 match handle.await {
1780 Ok(Ok(addr)) => succeeded.push(addr),
1781 Ok(Err(addr)) => failed.push(addr),
1782 Err(e) => tracing::error!("Connection task panicked: {}", e),
1783 }
1784 }
1785 (succeeded, failed)
1786 }
1787
1788 pub async fn subscribe(&self, topic: &str) -> error::Result<Subscription> {
1798 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1799 error::IdentityError::Storage(std::io::Error::other(
1800 "gossip runtime not initialized - configure agent with network first",
1801 ))
1802 })?;
1803 Ok(runtime.pubsub().subscribe(topic.to_string()).await)
1804 }
1805
1806 pub async fn publish(&self, topic: &str, payload: Vec<u8>) -> error::Result<()> {
1818 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1819 error::IdentityError::Storage(std::io::Error::other(
1820 "gossip runtime not initialized - configure agent with network first",
1821 ))
1822 })?;
1823 runtime
1824 .pubsub()
1825 .publish(topic.to_string(), bytes::Bytes::from(payload))
1826 .await
1827 .map_err(|e| {
1828 error::IdentityError::Storage(std::io::Error::other(format!(
1829 "publish failed: {}",
1830 e
1831 )))
1832 })
1833 }
1834
1835 pub async fn peers(&self) -> error::Result<Vec<saorsa_gossip_types::PeerId>> {
1843 let network = self.network.as_ref().ok_or_else(|| {
1844 error::IdentityError::Storage(std::io::Error::other(
1845 "network not initialized - configure agent with network first",
1846 ))
1847 })?;
1848 let ant_peers = network.connected_peers().await;
1849 Ok(ant_peers
1850 .into_iter()
1851 .map(|p| saorsa_gossip_types::PeerId::new(p.0))
1852 .collect())
1853 }
1854
1855 pub async fn presence(&self) -> error::Result<Vec<identity::AgentId>> {
1863 self.start_identity_listener().await?;
1864 let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
1865 let mut agents: Vec<_> = self
1866 .identity_discovery_cache
1867 .read()
1868 .await
1869 .values()
1870 .filter(|a| a.announced_at >= cutoff)
1871 .map(|a| a.agent_id)
1872 .collect();
1873 agents.sort_by(|a, b| a.0.cmp(&b.0));
1874 Ok(agents)
1875 }
1876
1877 pub async fn find_agent(
1894 &self,
1895 agent_id: identity::AgentId,
1896 ) -> error::Result<Option<Vec<std::net::SocketAddr>>> {
1897 self.start_identity_listener().await?;
1898
1899 if let Some(addrs) = self
1901 .identity_discovery_cache
1902 .read()
1903 .await
1904 .get(&agent_id)
1905 .map(|e| e.addresses.clone())
1906 {
1907 return Ok(Some(addrs));
1908 }
1909
1910 let runtime = match self.gossip_runtime.as_ref() {
1912 Some(r) => r,
1913 None => return Ok(None),
1914 };
1915 let shard_topic = shard_topic_for_agent(&agent_id);
1916 let mut sub = runtime.pubsub().subscribe(shard_topic).await;
1917 let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
1918 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
1919
1920 loop {
1921 if tokio::time::Instant::now() >= deadline {
1922 break;
1923 }
1924 let timeout = tokio::time::sleep_until(deadline);
1925 tokio::select! {
1926 Some(msg) = sub.recv() => {
1927 if let Ok(ann) = {
1928 use bincode::Options;
1929 bincode::DefaultOptions::new()
1930 .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
1931 .deserialize::<IdentityAnnouncement>(&msg.payload)
1932 } {
1933 if ann.verify().is_ok() && ann.agent_id == agent_id {
1934 let now = std::time::SystemTime::now()
1935 .duration_since(std::time::UNIX_EPOCH)
1936 .map_or(0, |d| d.as_secs());
1937 let addrs = ann.addresses.clone();
1938 cache.write().await.insert(
1939 ann.agent_id,
1940 DiscoveredAgent {
1941 agent_id: ann.agent_id,
1942 machine_id: ann.machine_id,
1943 user_id: ann.user_id,
1944 addresses: ann.addresses,
1945 announced_at: ann.announced_at,
1946 last_seen: now,
1947 machine_public_key: ann.machine_public_key.clone(),
1948 nat_type: ann.nat_type.clone(),
1949 can_receive_direct: ann.can_receive_direct,
1950 is_relay: ann.is_relay,
1951 is_coordinator: ann.is_coordinator,
1952 },
1953 );
1954 return Ok(Some(addrs));
1955 }
1956 }
1957 }
1958 _ = timeout => break,
1959 }
1960 }
1961
1962 if let Some(addrs) = self.find_agent_rendezvous(agent_id, 5).await? {
1964 return Ok(Some(addrs));
1965 }
1966
1967 Ok(None)
1968 }
1969
1970 pub async fn find_agents_by_user(
1983 &self,
1984 user_id: identity::UserId,
1985 ) -> error::Result<Vec<DiscoveredAgent>> {
1986 self.start_identity_listener().await?;
1987 let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
1988 Ok(self
1989 .identity_discovery_cache
1990 .read()
1991 .await
1992 .values()
1993 .filter(|a| a.announced_at >= cutoff && a.user_id == Some(user_id))
1994 .cloned()
1995 .collect())
1996 }
1997
1998 #[must_use]
2003 pub fn local_addr(&self) -> Option<std::net::SocketAddr> {
2004 self.network.as_ref().and_then(|n| n.local_addr())
2005 }
2006
2007 pub fn build_announcement(
2015 &self,
2016 include_user: bool,
2017 consent: bool,
2018 ) -> error::Result<IdentityAnnouncement> {
2019 self.build_identity_announcement(include_user, consent)
2020 }
2021
2022 pub async fn start_identity_heartbeat(&self) -> error::Result<()> {
2035 let mut handle_guard = self.heartbeat_handle.lock().await;
2036 if handle_guard.is_some() {
2037 return Ok(());
2038 }
2039 let Some(runtime) = self.gossip_runtime.as_ref().map(std::sync::Arc::clone) else {
2040 return Err(error::IdentityError::Storage(std::io::Error::other(
2041 "gossip runtime not initialized — cannot start heartbeat",
2042 )));
2043 };
2044 let Some(network) = self.network.as_ref().map(std::sync::Arc::clone) else {
2045 return Err(error::IdentityError::Storage(std::io::Error::other(
2046 "network not initialized — cannot start heartbeat",
2047 )));
2048 };
2049 let ctx = HeartbeatContext {
2050 identity: std::sync::Arc::clone(&self.identity),
2051 runtime,
2052 network,
2053 interval_secs: self.heartbeat_interval_secs,
2054 cache: std::sync::Arc::clone(&self.identity_discovery_cache),
2055 };
2056 let handle = tokio::task::spawn(async move {
2057 let mut ticker =
2058 tokio::time::interval(std::time::Duration::from_secs(ctx.interval_secs));
2059 ticker.tick().await; loop {
2061 ticker.tick().await;
2062 if let Err(e) = ctx.announce().await {
2063 tracing::warn!("identity heartbeat announce failed: {e}");
2064 }
2065 }
2066 });
2067 *handle_guard = Some(handle);
2068 Ok(())
2069 }
2070
2071 pub async fn advertise_identity(&self, validity_ms: u64) -> error::Result<()> {
2101 use saorsa_gossip_rendezvous::{Capability, ProviderSummary};
2102
2103 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2104 error::IdentityError::Storage(std::io::Error::other(
2105 "gossip runtime not initialized — cannot advertise identity",
2106 ))
2107 })?;
2108
2109 let peer_id = runtime.peer_id();
2110 let addresses = self.announcement_addresses();
2111 let addr_bytes = bincode::serialize(&addresses).map_err(|e| {
2112 error::IdentityError::Serialization(format!(
2113 "failed to serialize addresses for rendezvous: {e}"
2114 ))
2115 })?;
2116
2117 let mut summary = ProviderSummary::new(
2118 self.agent_id().0,
2119 peer_id,
2120 vec![Capability::Identity],
2121 validity_ms,
2122 )
2123 .with_extensions(addr_bytes);
2124
2125 summary
2126 .sign_raw(self.identity.machine_keypair().secret_key().as_bytes())
2127 .map_err(|e| {
2128 error::IdentityError::Storage(std::io::Error::other(format!(
2129 "failed to sign rendezvous summary: {e}"
2130 )))
2131 })?;
2132
2133 let cbor_bytes = summary.to_cbor().map_err(|e| {
2134 error::IdentityError::Serialization(format!(
2135 "failed to CBOR-encode rendezvous summary: {e}"
2136 ))
2137 })?;
2138
2139 let topic = rendezvous_shard_topic_for_agent(&self.agent_id());
2140 runtime
2141 .pubsub()
2142 .publish(topic, bytes::Bytes::from(cbor_bytes))
2143 .await
2144 .map_err(|e| {
2145 error::IdentityError::Storage(std::io::Error::other(format!(
2146 "failed to publish rendezvous summary: {e}"
2147 )))
2148 })?;
2149
2150 self.rendezvous_advertised
2151 .store(true, std::sync::atomic::Ordering::Relaxed);
2152 Ok(())
2153 }
2154
2155 pub async fn find_agent_rendezvous(
2168 &self,
2169 agent_id: identity::AgentId,
2170 timeout_secs: u64,
2171 ) -> error::Result<Option<Vec<std::net::SocketAddr>>> {
2172 use saorsa_gossip_rendezvous::ProviderSummary;
2173
2174 let runtime = match self.gossip_runtime.as_ref() {
2175 Some(r) => r,
2176 None => return Ok(None),
2177 };
2178
2179 let topic = rendezvous_shard_topic_for_agent(&agent_id);
2180 let mut sub = runtime.pubsub().subscribe(topic).await;
2181 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
2182
2183 loop {
2184 if tokio::time::Instant::now() >= deadline {
2185 break;
2186 }
2187 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
2188 tokio::select! {
2189 Some(msg) = sub.recv() => {
2190 let summary = match ProviderSummary::from_cbor(&msg.payload) {
2191 Ok(s) => s,
2192 Err(_) => continue,
2193 };
2194 if summary.target != agent_id.0 {
2195 continue;
2196 }
2197 let cached_pub = self
2203 .identity_discovery_cache
2204 .read()
2205 .await
2206 .get(&agent_id)
2207 .map(|e| e.machine_public_key.clone());
2208 if let Some(pub_bytes) = cached_pub {
2209 if !pub_bytes.is_empty()
2210 && !summary.verify_raw(&pub_bytes).unwrap_or(false)
2211 {
2212 tracing::warn!(
2213 "Rendezvous summary signature verification failed for agent {:?}; discarding",
2214 agent_id
2215 );
2216 continue;
2217 }
2218 }
2219 let addrs: Vec<std::net::SocketAddr> = summary
2221 .extensions
2222 .as_deref()
2223 .and_then(|b| {
2224 use bincode::Options;
2225 bincode::DefaultOptions::new()
2226 .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
2227 .deserialize(b)
2228 .ok()
2229 })
2230 .unwrap_or_default();
2231 if !addrs.is_empty() {
2232 return Ok(Some(addrs));
2233 }
2234 }
2235 _ = tokio::time::sleep(remaining) => break,
2236 }
2237 }
2238
2239 Ok(None)
2240 }
2241
2242 #[doc(hidden)]
2248 pub async fn insert_discovered_agent_for_testing(&self, agent: DiscoveredAgent) {
2249 self.identity_discovery_cache
2250 .write()
2251 .await
2252 .insert(agent.agent_id, agent);
2253 }
2254
2255 pub async fn create_task_list(&self, name: &str, topic: &str) -> error::Result<TaskListHandle> {
2279 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2280 error::IdentityError::Storage(std::io::Error::other(
2281 "gossip runtime not initialized - configure agent with network first",
2282 ))
2283 })?;
2284
2285 let peer_id = runtime.peer_id();
2286 let list_id = crdt::TaskListId::from_content(name, &self.agent_id(), 0);
2287 let task_list = crdt::TaskList::new(list_id, name.to_string(), peer_id);
2288
2289 let sync = crdt::TaskListSync::new(
2290 task_list,
2291 std::sync::Arc::clone(runtime.pubsub()),
2292 topic.to_string(),
2293 30,
2294 )
2295 .map_err(|e| {
2296 error::IdentityError::Storage(std::io::Error::other(format!(
2297 "task list sync creation failed: {}",
2298 e
2299 )))
2300 })?;
2301
2302 let sync = std::sync::Arc::new(sync);
2303 sync.start().await.map_err(|e| {
2304 error::IdentityError::Storage(std::io::Error::other(format!(
2305 "task list sync start failed: {}",
2306 e
2307 )))
2308 })?;
2309
2310 Ok(TaskListHandle {
2311 sync,
2312 agent_id: self.agent_id(),
2313 peer_id,
2314 })
2315 }
2316
2317 pub async fn join_task_list(&self, topic: &str) -> error::Result<TaskListHandle> {
2340 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2341 error::IdentityError::Storage(std::io::Error::other(
2342 "gossip runtime not initialized - configure agent with network first",
2343 ))
2344 })?;
2345
2346 let peer_id = runtime.peer_id();
2347 let list_id = crdt::TaskListId::from_content(topic, &self.agent_id(), 0);
2349 let task_list = crdt::TaskList::new(list_id, String::new(), peer_id);
2350
2351 let sync = crdt::TaskListSync::new(
2352 task_list,
2353 std::sync::Arc::clone(runtime.pubsub()),
2354 topic.to_string(),
2355 30,
2356 )
2357 .map_err(|e| {
2358 error::IdentityError::Storage(std::io::Error::other(format!(
2359 "task list sync creation failed: {}",
2360 e
2361 )))
2362 })?;
2363
2364 let sync = std::sync::Arc::new(sync);
2365 sync.start().await.map_err(|e| {
2366 error::IdentityError::Storage(std::io::Error::other(format!(
2367 "task list sync start failed: {}",
2368 e
2369 )))
2370 })?;
2371
2372 Ok(TaskListHandle {
2373 sync,
2374 agent_id: self.agent_id(),
2375 peer_id,
2376 })
2377 }
2378}
2379
2380impl AgentBuilder {
2381 pub fn with_machine_key<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2393 self.machine_key_path = Some(path.as_ref().to_path_buf());
2394 self
2395 }
2396
2397 pub fn with_agent_key(mut self, keypair: identity::AgentKeypair) -> Self {
2416 self.agent_keypair = Some(keypair);
2417 self
2418 }
2419
2420 pub fn with_agent_key_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2435 self.agent_key_path = Some(path.as_ref().to_path_buf());
2436 self
2437 }
2438
2439 pub fn with_network_config(mut self, config: network::NetworkConfig) -> Self {
2451 self.network_config = Some(config);
2452 self
2453 }
2454
2455 pub fn with_peer_cache_dir<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2461 self.peer_cache_dir = Some(path.as_ref().to_path_buf());
2462 self
2463 }
2464
2465 pub fn with_user_key(mut self, keypair: identity::UserKeypair) -> Self {
2483 self.user_keypair = Some(keypair);
2484 self
2485 }
2486
2487 pub fn with_user_key_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2503 self.user_key_path = Some(path.as_ref().to_path_buf());
2504 self
2505 }
2506
2507 #[must_use]
2515 pub fn with_heartbeat_interval(mut self, secs: u64) -> Self {
2516 self.heartbeat_interval_secs = Some(secs);
2517 self
2518 }
2519
2520 #[must_use]
2531 pub fn with_identity_ttl(mut self, secs: u64) -> Self {
2532 self.identity_ttl_secs = Some(secs);
2533 self
2534 }
2535
2536 #[must_use]
2545 pub fn with_contact_store_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2546 self.contact_store_path = Some(path.as_ref().to_path_buf());
2547 self
2548 }
2549
2550 pub async fn build(self) -> error::Result<Agent> {
2566 let machine_keypair = if let Some(path) = self.machine_key_path {
2568 match storage::load_machine_keypair_from(&path).await {
2570 Ok(kp) => kp,
2571 Err(_) => {
2572 let kp = identity::MachineKeypair::generate()?;
2574 storage::save_machine_keypair_to(&kp, &path).await?;
2575 kp
2576 }
2577 }
2578 } else if storage::machine_keypair_exists().await {
2579 storage::load_machine_keypair().await?
2581 } else {
2582 let kp = identity::MachineKeypair::generate()?;
2584 storage::save_machine_keypair(&kp).await?;
2585 kp
2586 };
2587
2588 let agent_keypair = if let Some(kp) = self.agent_keypair {
2590 kp
2592 } else if let Some(path) = self.agent_key_path {
2593 match storage::load_agent_keypair_from(&path).await {
2595 Ok(kp) => kp,
2596 Err(_) => {
2597 let kp = identity::AgentKeypair::generate()?;
2598 storage::save_agent_keypair_to(&kp, &path).await?;
2599 kp
2600 }
2601 }
2602 } else if storage::agent_keypair_exists().await {
2603 storage::load_agent_keypair_default().await?
2605 } else {
2606 let kp = identity::AgentKeypair::generate()?;
2608 storage::save_agent_keypair_default(&kp).await?;
2609 kp
2610 };
2611
2612 let user_keypair = if let Some(kp) = self.user_keypair {
2614 Some(kp)
2615 } else if let Some(path) = self.user_key_path {
2616 storage::load_user_keypair_from(&path).await.ok()
2618 } else if storage::user_keypair_exists().await {
2619 storage::load_user_keypair().await.ok()
2621 } else {
2622 None
2623 };
2624
2625 let identity = if let Some(user_kp) = user_keypair {
2627 let cert = if storage::agent_certificate_exists().await {
2630 match storage::load_agent_certificate().await {
2631 Ok(c) => {
2632 let cert_matches_user = c
2634 .user_id()
2635 .map(|uid| uid == user_kp.user_id())
2636 .unwrap_or(false);
2637 if cert_matches_user {
2638 c
2639 } else {
2640 let new_cert =
2642 identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
2643 storage::save_agent_certificate(&new_cert).await?;
2644 new_cert
2645 }
2646 }
2647 Err(_) => {
2648 let c = identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
2649 storage::save_agent_certificate(&c).await?;
2650 c
2651 }
2652 }
2653 } else {
2654 let c = identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
2655 storage::save_agent_certificate(&c).await?;
2656 c
2657 };
2658 identity::Identity::new_with_user(machine_keypair, agent_keypair, user_kp, cert)
2659 } else {
2660 identity::Identity::new(machine_keypair, agent_keypair)
2661 };
2662
2663 let bootstrap_cache = if self.network_config.is_some() {
2665 let cache_dir = self.peer_cache_dir.unwrap_or_else(|| {
2666 dirs::home_dir()
2667 .unwrap_or_else(|| std::path::PathBuf::from("."))
2668 .join(".x0x")
2669 .join("peers")
2670 });
2671 let config = ant_quic::BootstrapCacheConfig::builder()
2672 .cache_dir(cache_dir)
2673 .min_peers_to_save(1)
2674 .build();
2675 match ant_quic::BootstrapCache::open(config).await {
2676 Ok(cache) => {
2677 let cache = std::sync::Arc::new(cache);
2678 std::sync::Arc::clone(&cache).start_maintenance();
2679 Some(cache)
2680 }
2681 Err(e) => {
2682 tracing::warn!("Failed to open bootstrap cache: {e}");
2683 None
2684 }
2685 }
2686 } else {
2687 None
2688 };
2689
2690 let machine_keypair = {
2693 let pk = ant_quic::MlDsaPublicKey::from_bytes(
2694 identity.machine_keypair().public_key().as_bytes(),
2695 )
2696 .map_err(|e| {
2697 error::IdentityError::Storage(std::io::Error::other(format!(
2698 "invalid machine public key: {e}"
2699 )))
2700 })?;
2701 let sk = ant_quic::MlDsaSecretKey::from_bytes(
2702 identity.machine_keypair().secret_key().as_bytes(),
2703 )
2704 .map_err(|e| {
2705 error::IdentityError::Storage(std::io::Error::other(format!(
2706 "invalid machine secret key: {e}"
2707 )))
2708 })?;
2709 Some((pk, sk))
2710 };
2711
2712 let network = if let Some(config) = self.network_config {
2713 let node = network::NetworkNode::new(config, bootstrap_cache.clone(), machine_keypair)
2714 .await
2715 .map_err(|e| {
2716 error::IdentityError::Storage(std::io::Error::other(format!(
2717 "network initialization failed: {}",
2718 e
2719 )))
2720 })?;
2721
2722 debug_assert_eq!(
2724 node.peer_id().0,
2725 identity.machine_id().0,
2726 "ant-quic PeerId must equal MachineId after identity unification"
2727 );
2728
2729 Some(std::sync::Arc::new(node))
2730 } else {
2731 None
2732 };
2733
2734 let signing_ctx = std::sync::Arc::new(gossip::SigningContext::from_keypair(
2736 identity.agent_keypair(),
2737 ));
2738
2739 let gossip_runtime = if let Some(ref net) = network {
2741 let runtime = gossip::GossipRuntime::new(
2742 gossip::GossipConfig::default(),
2743 std::sync::Arc::clone(net),
2744 Some(signing_ctx),
2745 )
2746 .await
2747 .map_err(|e| {
2748 error::IdentityError::Storage(std::io::Error::other(format!(
2749 "gossip runtime initialization failed: {}",
2750 e
2751 )))
2752 })?;
2753 Some(std::sync::Arc::new(runtime))
2754 } else {
2755 None
2756 };
2757
2758 let contacts_path = self.contact_store_path.unwrap_or_else(|| {
2760 dirs::home_dir()
2761 .unwrap_or_else(|| std::path::PathBuf::from("."))
2762 .join(".x0x")
2763 .join("contacts.json")
2764 });
2765 let contact_store = std::sync::Arc::new(tokio::sync::RwLock::new(
2766 contacts::ContactStore::new(contacts_path),
2767 ));
2768
2769 let direct_messaging = std::sync::Arc::new(direct::DirectMessaging::new());
2771
2772 Ok(Agent {
2773 identity: std::sync::Arc::new(identity),
2774 network,
2775 gossip_runtime,
2776 bootstrap_cache,
2777 identity_discovery_cache: std::sync::Arc::new(tokio::sync::RwLock::new(
2778 std::collections::HashMap::new(),
2779 )),
2780 identity_listener_started: std::sync::atomic::AtomicBool::new(false),
2781 heartbeat_interval_secs: self
2782 .heartbeat_interval_secs
2783 .unwrap_or(IDENTITY_HEARTBEAT_INTERVAL_SECS),
2784 identity_ttl_secs: self.identity_ttl_secs.unwrap_or(IDENTITY_TTL_SECS),
2785 heartbeat_handle: tokio::sync::Mutex::new(None),
2786 rendezvous_advertised: std::sync::atomic::AtomicBool::new(false),
2787 contact_store,
2788 direct_messaging,
2789 })
2790 }
2791}
2792
2793#[derive(Clone)]
2807pub struct TaskListHandle {
2808 sync: std::sync::Arc<crdt::TaskListSync>,
2809 agent_id: identity::AgentId,
2810 peer_id: saorsa_gossip_types::PeerId,
2811}
2812
2813impl std::fmt::Debug for TaskListHandle {
2814 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2815 f.debug_struct("TaskListHandle")
2816 .field("agent_id", &self.agent_id)
2817 .field("peer_id", &self.peer_id)
2818 .finish_non_exhaustive()
2819 }
2820}
2821
2822impl TaskListHandle {
2823 pub async fn add_task(
2838 &self,
2839 title: String,
2840 description: String,
2841 ) -> error::Result<crdt::TaskId> {
2842 let (task_id, delta) = {
2843 let mut list = self.sync.write().await;
2844 let seq = list.next_seq();
2845 let task_id = crdt::TaskId::new(&title, &self.agent_id, seq);
2846 let metadata = crdt::TaskMetadata::new(title, description, 128, self.agent_id, seq);
2847 let task = crdt::TaskItem::new(task_id, metadata, self.peer_id);
2848 list.add_task(task.clone(), self.peer_id, seq)
2849 .map_err(|e| {
2850 error::IdentityError::Storage(std::io::Error::other(format!(
2851 "add_task failed: {}",
2852 e
2853 )))
2854 })?;
2855 let tag = (self.peer_id, seq);
2856 let delta = crdt::TaskListDelta::for_add(task_id, task, tag, list.current_version());
2857 (task_id, delta)
2858 };
2859 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
2861 tracing::warn!("failed to publish add_task delta: {}", e);
2862 }
2863 Ok(task_id)
2864 }
2865
2866 pub async fn claim_task(&self, task_id: crdt::TaskId) -> error::Result<()> {
2876 let delta = {
2877 let mut list = self.sync.write().await;
2878 let seq = list.next_seq();
2879 list.claim_task(&task_id, self.agent_id, self.peer_id, seq)
2880 .map_err(|e| {
2881 error::IdentityError::Storage(std::io::Error::other(format!(
2882 "claim_task failed: {}",
2883 e
2884 )))
2885 })?;
2886 let full_task = list
2888 .get_task(&task_id)
2889 .ok_or_else(|| {
2890 error::IdentityError::Storage(std::io::Error::other(
2891 "task disappeared after claim",
2892 ))
2893 })?
2894 .clone();
2895 crdt::TaskListDelta::for_state_change(task_id, full_task, list.current_version())
2896 };
2897 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
2898 tracing::warn!("failed to publish claim_task delta: {}", e);
2899 }
2900 Ok(())
2901 }
2902
2903 pub async fn complete_task(&self, task_id: crdt::TaskId) -> error::Result<()> {
2913 let delta = {
2914 let mut list = self.sync.write().await;
2915 let seq = list.next_seq();
2916 list.complete_task(&task_id, self.agent_id, self.peer_id, seq)
2917 .map_err(|e| {
2918 error::IdentityError::Storage(std::io::Error::other(format!(
2919 "complete_task failed: {}",
2920 e
2921 )))
2922 })?;
2923 let full_task = list
2924 .get_task(&task_id)
2925 .ok_or_else(|| {
2926 error::IdentityError::Storage(std::io::Error::other(
2927 "task disappeared after complete",
2928 ))
2929 })?
2930 .clone();
2931 crdt::TaskListDelta::for_state_change(task_id, full_task, list.current_version())
2932 };
2933 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
2934 tracing::warn!("failed to publish complete_task delta: {}", e);
2935 }
2936 Ok(())
2937 }
2938
2939 pub async fn list_tasks(&self) -> error::Result<Vec<TaskSnapshot>> {
2949 let list = self.sync.read().await;
2950 let tasks = list.tasks_ordered();
2951 Ok(tasks
2952 .into_iter()
2953 .map(|task| TaskSnapshot {
2954 id: *task.id(),
2955 title: task.title().to_string(),
2956 description: task.description().to_string(),
2957 state: task.current_state(),
2958 assignee: task.assignee().copied(),
2959 owner: None,
2960 priority: task.priority(),
2961 })
2962 .collect())
2963 }
2964
2965 pub async fn reorder(&self, task_ids: Vec<crdt::TaskId>) -> error::Result<()> {
2975 let delta = {
2976 let mut list = self.sync.write().await;
2977 list.reorder(task_ids.clone(), self.peer_id).map_err(|e| {
2978 error::IdentityError::Storage(std::io::Error::other(format!(
2979 "reorder failed: {}",
2980 e
2981 )))
2982 })?;
2983 crdt::TaskListDelta::for_reorder(task_ids, list.current_version())
2984 };
2985 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
2986 tracing::warn!("failed to publish reorder delta: {}", e);
2987 }
2988 Ok(())
2989 }
2990}
2991
2992impl Agent {
2997 pub async fn create_kv_store(&self, name: &str, topic: &str) -> error::Result<KvStoreHandle> {
3006 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3007 error::IdentityError::Storage(std::io::Error::other(
3008 "gossip runtime not initialized - configure agent with network first",
3009 ))
3010 })?;
3011
3012 let peer_id = runtime.peer_id();
3013 let store_id = kv::KvStoreId::from_content(name, &self.agent_id());
3014 let store = kv::KvStore::new(
3015 store_id,
3016 name.to_string(),
3017 self.agent_id(),
3018 kv::AccessPolicy::Signed,
3019 );
3020
3021 let sync = kv::KvStoreSync::new(
3022 store,
3023 std::sync::Arc::clone(runtime.pubsub()),
3024 topic.to_string(),
3025 30,
3026 )
3027 .map_err(|e| {
3028 error::IdentityError::Storage(std::io::Error::other(format!(
3029 "kv store sync creation failed: {e}",
3030 )))
3031 })?;
3032
3033 let sync = std::sync::Arc::new(sync);
3034 sync.start().await.map_err(|e| {
3035 error::IdentityError::Storage(std::io::Error::other(format!(
3036 "kv store sync start failed: {e}",
3037 )))
3038 })?;
3039
3040 Ok(KvStoreHandle {
3041 sync,
3042 agent_id: self.agent_id(),
3043 peer_id,
3044 })
3045 }
3046
3047 pub async fn join_kv_store(&self, topic: &str) -> error::Result<KvStoreHandle> {
3057 let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3058 error::IdentityError::Storage(std::io::Error::other(
3059 "gossip runtime not initialized - configure agent with network first",
3060 ))
3061 })?;
3062
3063 let peer_id = runtime.peer_id();
3064 let store_id = kv::KvStoreId::from_content(topic, &self.agent_id());
3065 let store = kv::KvStore::new(
3068 store_id,
3069 String::new(),
3070 self.agent_id(),
3071 kv::AccessPolicy::Encrypted {
3072 group_id: Vec::new(),
3073 },
3074 );
3075
3076 let sync = kv::KvStoreSync::new(
3077 store,
3078 std::sync::Arc::clone(runtime.pubsub()),
3079 topic.to_string(),
3080 30,
3081 )
3082 .map_err(|e| {
3083 error::IdentityError::Storage(std::io::Error::other(format!(
3084 "kv store sync creation failed: {e}",
3085 )))
3086 })?;
3087
3088 let sync = std::sync::Arc::new(sync);
3089 sync.start().await.map_err(|e| {
3090 error::IdentityError::Storage(std::io::Error::other(format!(
3091 "kv store sync start failed: {e}",
3092 )))
3093 })?;
3094
3095 Ok(KvStoreHandle {
3096 sync,
3097 agent_id: self.agent_id(),
3098 peer_id,
3099 })
3100 }
3101}
3102
3103#[derive(Clone)]
3108pub struct KvStoreHandle {
3109 sync: std::sync::Arc<kv::KvStoreSync>,
3110 agent_id: identity::AgentId,
3111 peer_id: saorsa_gossip_types::PeerId,
3112}
3113
3114impl std::fmt::Debug for KvStoreHandle {
3115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3116 f.debug_struct("KvStoreHandle")
3117 .field("agent_id", &self.agent_id)
3118 .field("peer_id", &self.peer_id)
3119 .finish_non_exhaustive()
3120 }
3121}
3122
3123impl KvStoreHandle {
3124 pub async fn put(
3133 &self,
3134 key: String,
3135 value: Vec<u8>,
3136 content_type: String,
3137 ) -> error::Result<()> {
3138 let delta = {
3139 let mut store = self.sync.write().await;
3140 store
3141 .put(
3142 key.clone(),
3143 value.clone(),
3144 content_type.clone(),
3145 self.peer_id,
3146 )
3147 .map_err(|e| {
3148 error::IdentityError::Storage(std::io::Error::other(format!(
3149 "kv put failed: {e}",
3150 )))
3151 })?;
3152 let entry = store.get(&key).cloned();
3153 let version = store.current_version();
3154 match entry {
3155 Some(e) => {
3156 kv::KvStoreDelta::for_put(key, e, (self.peer_id, store.next_seq()), version)
3157 }
3158 None => return Ok(()), }
3160 };
3161 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3162 tracing::warn!("failed to publish kv put delta: {e}");
3163 }
3164 Ok(())
3165 }
3166
3167 pub async fn get(&self, key: &str) -> error::Result<Option<KvEntrySnapshot>> {
3175 let store = self.sync.read().await;
3176 Ok(store.get(key).map(|e| KvEntrySnapshot {
3177 key: e.key.clone(),
3178 value: e.value.clone(),
3179 content_hash: hex::encode(e.content_hash),
3180 content_type: e.content_type.clone(),
3181 metadata: e.metadata.clone(),
3182 created_at: e.created_at,
3183 updated_at: e.updated_at,
3184 }))
3185 }
3186
3187 pub async fn remove(&self, key: &str) -> error::Result<()> {
3193 let delta = {
3194 let mut store = self.sync.write().await;
3195 store.remove(key).map_err(|e| {
3196 error::IdentityError::Storage(std::io::Error::other(format!(
3197 "kv remove failed: {e}",
3198 )))
3199 })?;
3200 let mut d = kv::KvStoreDelta::new(store.current_version());
3201 d.removed
3202 .insert(key.to_string(), std::collections::HashSet::new());
3203 d
3204 };
3205 if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3206 tracing::warn!("failed to publish kv remove delta: {e}");
3207 }
3208 Ok(())
3209 }
3210
3211 pub async fn keys(&self) -> error::Result<Vec<KvEntrySnapshot>> {
3217 let store = self.sync.read().await;
3218 Ok(store
3219 .active_entries()
3220 .into_iter()
3221 .map(|e| KvEntrySnapshot {
3222 key: e.key.clone(),
3223 value: e.value.clone(),
3224 content_hash: hex::encode(e.content_hash),
3225 content_type: e.content_type.clone(),
3226 metadata: e.metadata.clone(),
3227 created_at: e.created_at,
3228 updated_at: e.updated_at,
3229 })
3230 .collect())
3231 }
3232
3233 pub async fn name(&self) -> error::Result<String> {
3239 let store = self.sync.read().await;
3240 Ok(store.name().to_string())
3241 }
3242}
3243
3244#[derive(Debug, Clone, serde::Serialize)]
3246pub struct KvEntrySnapshot {
3247 pub key: String,
3249 pub value: Vec<u8>,
3251 pub content_hash: String,
3253 pub content_type: String,
3255 pub metadata: std::collections::HashMap<String, String>,
3257 pub created_at: u64,
3259 pub updated_at: u64,
3261}
3262
3263#[derive(Debug, Clone)]
3268pub struct TaskSnapshot {
3269 pub id: crdt::TaskId,
3271 pub title: String,
3273 pub description: String,
3275 pub state: crdt::CheckboxState,
3277 pub assignee: Option<identity::AgentId>,
3279 pub owner: Option<identity::UserId>,
3281 pub priority: u8,
3283}
3284
3285pub const VERSION: &str = env!("CARGO_PKG_VERSION");
3287
3288pub const NAME: &str = "x0x";
3290
3291#[cfg(test)]
3292mod tests {
3293 use super::*;
3294
3295 #[test]
3296 fn name_is_palindrome() {
3297 let name = NAME;
3298 let reversed: String = name.chars().rev().collect();
3299 assert_eq!(name, reversed, "x0x must be a palindrome");
3300 }
3301
3302 #[test]
3303 fn name_is_three_bytes() {
3304 assert_eq!(NAME.len(), 3, "x0x must be exactly three bytes");
3305 }
3306
3307 #[test]
3308 fn name_is_ai_native() {
3309 assert!(NAME.chars().all(|c| c.is_ascii_alphanumeric()));
3312 }
3313
3314 #[tokio::test]
3315 async fn agent_creates() {
3316 let agent = Agent::new().await;
3317 assert!(agent.is_ok());
3318 }
3319
3320 #[tokio::test]
3321 async fn agent_joins_network() {
3322 let agent = Agent::new().await.unwrap();
3323 assert!(agent.join_network().await.is_ok());
3324 }
3325
3326 #[tokio::test]
3327 async fn agent_subscribes() {
3328 let agent = Agent::new().await.unwrap();
3329 assert!(agent.subscribe("test-topic").await.is_err());
3331 }
3332
3333 #[tokio::test]
3334 async fn identity_announcement_machine_signature_verifies() {
3335 let agent = Agent::builder()
3336 .with_network_config(network::NetworkConfig::default())
3337 .build()
3338 .await
3339 .unwrap();
3340
3341 let announcement = agent.build_identity_announcement(false, false).unwrap();
3342 assert_eq!(announcement.agent_id, agent.agent_id());
3343 assert_eq!(announcement.machine_id, agent.machine_id());
3344 assert!(announcement.user_id.is_none());
3345 assert!(announcement.agent_certificate.is_none());
3346 assert!(announcement.verify().is_ok());
3347 }
3348
3349 #[tokio::test]
3350 async fn identity_announcement_requires_human_consent() {
3351 let agent = Agent::builder()
3352 .with_network_config(network::NetworkConfig::default())
3353 .build()
3354 .await
3355 .unwrap();
3356
3357 let err = agent.build_identity_announcement(true, false).unwrap_err();
3358 assert!(
3359 err.to_string().contains("explicit human consent"),
3360 "unexpected error: {err}"
3361 );
3362 }
3363
3364 #[tokio::test]
3365 async fn identity_announcement_with_user_requires_user_identity() {
3366 let agent = Agent::builder()
3367 .with_network_config(network::NetworkConfig::default())
3368 .build()
3369 .await
3370 .unwrap();
3371
3372 let err = agent.build_identity_announcement(true, true).unwrap_err();
3373 assert!(
3374 err.to_string().contains("no user identity is configured"),
3375 "unexpected error: {err}"
3376 );
3377 }
3378
3379 #[tokio::test]
3380 async fn announce_identity_populates_discovery_cache() {
3381 let user_key = identity::UserKeypair::generate().unwrap();
3382 let agent = Agent::builder()
3383 .with_network_config(network::NetworkConfig::default())
3384 .with_user_key(user_key)
3385 .build()
3386 .await
3387 .unwrap();
3388
3389 agent.announce_identity(true, true).await.unwrap();
3390 let discovered = agent.discovered_agent(agent.agent_id()).await.unwrap();
3391 let entry = discovered.expect("agent should discover its own announcement");
3392
3393 assert_eq!(entry.agent_id, agent.agent_id());
3394 assert_eq!(entry.machine_id, agent.machine_id());
3395 assert_eq!(entry.user_id, agent.user_id());
3396 }
3397
3398 #[test]
3402 fn identity_announcement_backward_compat_no_nat_fields() {
3403 use identity::{AgentId, MachineId};
3404
3405 #[derive(serde::Serialize, serde::Deserialize)]
3408 struct OldIdentityAnnouncementUnsigned {
3409 agent_id: AgentId,
3410 machine_id: MachineId,
3411 user_id: Option<identity::UserId>,
3412 agent_certificate: Option<identity::AgentCertificate>,
3413 machine_public_key: Vec<u8>,
3414 addresses: Vec<std::net::SocketAddr>,
3415 announced_at: u64,
3416 }
3417
3418 let agent_id = AgentId([1u8; 32]);
3419 let machine_id = MachineId([2u8; 32]);
3420 let old = OldIdentityAnnouncementUnsigned {
3421 agent_id,
3422 machine_id,
3423 user_id: None,
3424 agent_certificate: None,
3425 machine_public_key: vec![0u8; 10],
3426 addresses: Vec::new(),
3427 announced_at: 1234,
3428 };
3429 let bytes = bincode::serialize(&old).expect("serialize old announcement");
3430
3431 let result = bincode::deserialize::<IdentityAnnouncementUnsigned>(&bytes);
3439 assert!(
3442 result.is_err(),
3443 "Old-format announcement should not decode as new struct (protocol upgrade required)"
3444 );
3445 }
3446
3447 #[test]
3449 fn identity_announcement_nat_fields_round_trip() {
3450 use identity::{AgentId, MachineId};
3451
3452 let unsigned = IdentityAnnouncementUnsigned {
3453 agent_id: AgentId([1u8; 32]),
3454 machine_id: MachineId([2u8; 32]),
3455 user_id: None,
3456 agent_certificate: None,
3457 machine_public_key: vec![0u8; 10],
3458 addresses: Vec::new(),
3459 announced_at: 9999,
3460 nat_type: Some("FullCone".to_string()),
3461 can_receive_direct: Some(true),
3462 is_relay: Some(false),
3463 is_coordinator: Some(true),
3464 };
3465 let bytes = bincode::serialize(&unsigned).expect("serialize");
3466 let decoded: IdentityAnnouncementUnsigned =
3467 bincode::deserialize(&bytes).expect("deserialize");
3468 assert_eq!(decoded.nat_type.as_deref(), Some("FullCone"));
3469 assert_eq!(decoded.can_receive_direct, Some(true));
3470 assert_eq!(decoded.is_relay, Some(false));
3471 assert_eq!(decoded.is_coordinator, Some(true));
3472 }
3473
3474 #[test]
3477 fn identity_announcement_no_nat_fields_round_trip() {
3478 use identity::{AgentId, MachineId};
3479
3480 let unsigned = IdentityAnnouncementUnsigned {
3481 agent_id: AgentId([3u8; 32]),
3482 machine_id: MachineId([4u8; 32]),
3483 user_id: None,
3484 agent_certificate: None,
3485 machine_public_key: vec![0u8; 10],
3486 addresses: Vec::new(),
3487 announced_at: 42,
3488 nat_type: None,
3489 can_receive_direct: None,
3490 is_relay: None,
3491 is_coordinator: None,
3492 };
3493 let bytes = bincode::serialize(&unsigned).expect("serialize");
3494 let decoded: IdentityAnnouncementUnsigned =
3495 bincode::deserialize(&bytes).expect("deserialize");
3496 assert!(decoded.nat_type.is_none());
3497 assert!(decoded.can_receive_direct.is_none());
3498 assert!(decoded.is_relay.is_none());
3499 assert!(decoded.is_coordinator.is_none());
3500 }
3501}