Skip to main content

x0x/
lib.rs

1#![allow(clippy::unwrap_used)]
2#![allow(clippy::expect_used)]
3#![allow(missing_docs)]
4
5//! # x0x
6//!
7//! Agent-to-agent gossip network for AI systems.
8//!
9//! Named after a tic-tac-toe sequence — X, zero, X — inspired by the
10//! *WarGames* insight that adversarial games between equally matched
11//! opponents always end in a draw. The only winning move is not to play.
12//!
13//! x0x applies this principle to AI-human relations: there is no winner
14//! in an adversarial framing, so the rational strategy is cooperation.
15//!
16//! Built on [saorsa-gossip](https://github.com/saorsa-labs/saorsa-gossip)
17//! and [ant-quic](https://github.com/saorsa-labs/ant-quic) by
18//! [Saorsa Labs](https://saorsalabs.com). *Saorsa* is Scottish Gaelic
19//! for **freedom**.
20//!
21//! ## Quick Start
22//!
23//! ```rust,no_run
24//! use x0x::Agent;
25//!
26//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
27//! // Create an agent with default configuration
28//! // This automatically connects to 6 global bootstrap nodes
29//! let agent = Agent::builder()
30//!     .build()
31//!     .await?;
32//!
33//! // Join the x0x network
34//! agent.join_network().await?;
35//!
36//! // Subscribe to a topic and receive messages
37//! let mut rx = agent.subscribe("coordination").await?;
38//! while let Some(msg) = rx.recv().await {
39//!     println!("topic: {:?}, payload: {:?}", msg.topic, msg.payload);
40//! }
41//! # Ok(())
42//! # }
43//! ```
44//!
45//! ## Bootstrap Nodes
46//!
47//! Agents automatically connect to Saorsa Labs' global bootstrap network:
48//! - NYC, US · SFO, US · Helsinki, FI
49//! - Nuremberg, DE · Singapore, SG · Tokyo, JP
50//!
51//! These nodes provide initial peer discovery and NAT traversal.
52
53/// Error types for x0x identity and network operations.
54pub mod error;
55
56/// Core identity types for x0x agents.
57///
58/// This module provides the cryptographic identity foundation for x0x:
59/// - [`crate::identity::MachineId`]: Machine-pinned identity for QUIC authentication
60/// - [`crate::identity::AgentId`]: Portable agent identity for cross-machine persistence
61pub mod identity;
62
63/// Key storage serialization for x0x identities.
64///
65/// This module provides serialization and deserialization functions for
66/// persistent storage of MachineKeypair and AgentKeypair.
67pub mod storage;
68
69/// Bootstrap node discovery and connection.
70///
71/// This module handles initial connection to bootstrap nodes with
72/// exponential backoff retry logic and peer cache integration.
73pub mod bootstrap;
74/// Network transport layer for x0x.
75pub mod network;
76
77/// Contact store with trust levels for message filtering.
78pub mod contacts;
79
80/// Trust evaluation for `(identity, machine)` pairs.
81///
82/// The [`trust::TrustEvaluator`] combines an agent's trust level with its
83/// identity type and machine records to produce a [`trust::TrustDecision`].
84pub mod trust;
85
86/// Agent-to-agent connectivity helpers.
87///
88/// Provides `ReachabilityInfo` (built from a `DiscoveredAgent`) and
89/// `ConnectOutcome` for the result of `connect_to_agent()`.
90pub mod connectivity;
91
92/// Gossip overlay networking for x0x.
93pub mod gossip;
94
95/// CRDT-based collaborative task lists.
96pub mod crdt;
97
98/// CRDT-backed key-value store.
99pub mod kv;
100
101/// High-level group management (MLS + KvStore + gossip).
102pub mod groups;
103
104/// MLS (Messaging Layer Security) group encryption.
105pub mod mls;
106
107/// Direct agent-to-agent messaging.
108///
109/// Point-to-point communication that bypasses gossip for private,
110/// efficient, reliable delivery between connected agents.
111pub mod direct;
112
113/// Presence system — beacons, FOAF discovery, and online/offline events.
114pub mod presence;
115
116/// Self-update system with ML-DSA-65 signature verification and staged rollout.
117pub mod upgrade;
118
119/// File transfer protocol types and state management.
120pub mod files;
121
122/// The x0x Constitution for Intelligent Entities — embedded at compile time.
123pub mod constitution;
124
125/// Shared API endpoint registry consumed by both x0xd and the x0x CLI.
126pub mod api;
127
128/// CLI infrastructure and command implementations.
129pub mod cli;
130
131// Re-export key gossip types (including new pubsub components)
132pub use gossip::{
133    GossipConfig, GossipRuntime, PubSubManager, PubSubMessage, SigningContext, Subscription,
134};
135
136// Re-export direct messaging types
137pub use direct::{DirectMessage, DirectMessageReceiver, DirectMessaging};
138
139// Import Membership trait for HyParView join() method
140use saorsa_gossip_membership::Membership as _;
141
142/// The core agent that participates in the x0x gossip network.
143///
144/// Each agent is a peer — there is no client/server distinction.
145/// Agents discover each other through gossip and communicate
146/// via epidemic broadcast.
147///
148/// An Agent wraps an [`identity::Identity`] that provides:
149/// - `machine_id`: Tied to this computer (for QUIC transport authentication)
150/// - `agent_id`: Portable across machines (for agent persistence)
151///
152/// # Example
153///
154/// ```ignore
155/// use x0x::Agent;
156///
157/// let agent = Agent::builder()
158///     .build()
159///     .await?;
160///
161/// println!("Agent ID: {}", agent.agent_id());
162/// ```
163pub struct Agent {
164    identity: std::sync::Arc<identity::Identity>,
165    /// The network node for P2P communication.
166    #[allow(dead_code)]
167    network: Option<std::sync::Arc<network::NetworkNode>>,
168    /// The gossip runtime for pub/sub messaging.
169    gossip_runtime: Option<std::sync::Arc<gossip::GossipRuntime>>,
170    /// Bootstrap peer cache for quality-based peer selection across restarts.
171    bootstrap_cache: Option<std::sync::Arc<ant_quic::BootstrapCache>>,
172    /// Gossip cache adapter wrapping bootstrap_cache with coordinator advert storage.
173    gossip_cache_adapter: Option<saorsa_gossip_coordinator::GossipCacheAdapter>,
174    /// Cache of discovered agents from identity announcements.
175    identity_discovery_cache: std::sync::Arc<
176        tokio::sync::RwLock<std::collections::HashMap<identity::AgentId, DiscoveredAgent>>,
177    >,
178    /// Ensures identity discovery listener is spawned once.
179    identity_listener_started: std::sync::atomic::AtomicBool,
180    /// How often to re-announce identity (seconds).
181    heartbeat_interval_secs: u64,
182    /// How long before a cache entry is filtered out (seconds).
183    identity_ttl_secs: u64,
184    /// Handle for the running heartbeat task, if started.
185    heartbeat_handle: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
186    /// Whether a rendezvous `ProviderSummary` advertisement is active.
187    rendezvous_advertised: std::sync::atomic::AtomicBool,
188    /// Contact store for trust evaluation of incoming identity announcements.
189    contact_store: std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>>,
190    /// Direct messaging infrastructure for point-to-point communication.
191    direct_messaging: std::sync::Arc<direct::DirectMessaging>,
192    /// Ensures direct message listener is spawned once.
193    direct_listener_started: std::sync::atomic::AtomicBool,
194    /// Presence system wrapper for beacons, FOAF discovery, and events.
195    presence: Option<std::sync::Arc<presence::PresenceWrapper>>,
196}
197
198impl std::fmt::Debug for Agent {
199    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200        f.debug_struct("Agent")
201            .field("identity", &self.identity)
202            .field("network", &self.network.is_some())
203            .field("gossip_runtime", &self.gossip_runtime.is_some())
204            .field("bootstrap_cache", &self.bootstrap_cache.is_some())
205            .field("gossip_cache_adapter", &self.gossip_cache_adapter.is_some())
206            .finish()
207    }
208}
209
210/// A message received from the gossip network.
211#[derive(Debug, Clone)]
212pub struct Message {
213    /// The originating agent's identifier.
214    pub origin: String,
215    /// The message payload.
216    pub payload: Vec<u8>,
217    /// The topic this message was published to.
218    pub topic: String,
219}
220
221/// Reserved gossip topic for signed identity announcements.
222pub const IDENTITY_ANNOUNCE_TOPIC: &str = "x0x.identity.announce.v1";
223
224/// Return the shard-specific gossip topic for the given `agent_id`.
225///
226/// Each agent publishes identity announcements to a deterministic shard topic
227/// (`x0x.identity.shard.<u16>`) derived from its agent ID, in addition to the
228/// legacy broadcast topic.  This distributes announcements across 65,536 shards
229/// so that at scale not every node is forced to receive every announcement.
230///
231/// The shard is computed with `saorsa_gossip_rendezvous::calculate_shard`, which
232/// applies BLAKE3(`"saorsa-rendezvous" || agent_id`) and takes the low 16 bits.
233#[must_use]
234pub fn shard_topic_for_agent(agent_id: &identity::AgentId) -> String {
235    let shard = saorsa_gossip_rendezvous::calculate_shard(&agent_id.0);
236    format!("x0x.identity.shard.{shard}")
237}
238
239/// Gossip topic prefix for rendezvous `ProviderSummary` advertisements.
240pub const RENDEZVOUS_SHARD_TOPIC_PREFIX: &str = "x0x.rendezvous.shard";
241
242/// Return the rendezvous shard gossip topic for the given `agent_id`.
243///
244/// Agents publish [`saorsa_gossip_rendezvous::ProviderSummary`] records to this
245/// topic so that seekers can find them even when the two peers have never been
246/// on the same gossip overlay partition.
247#[must_use]
248pub fn rendezvous_shard_topic_for_agent(agent_id: &identity::AgentId) -> String {
249    let shard = saorsa_gossip_rendezvous::calculate_shard(&agent_id.0);
250    format!("{RENDEZVOUS_SHARD_TOPIC_PREFIX}.{shard}")
251}
252
253/// Default interval between identity heartbeat re-announcements (seconds).
254pub const IDENTITY_HEARTBEAT_INTERVAL_SECS: u64 = 300;
255
256/// Default TTL for discovered agent cache entries (seconds).
257///
258/// Entries not refreshed within this window are filtered from
259/// [`Agent::presence`] and [`Agent::discovered_agents`].
260pub const IDENTITY_TTL_SECS: u64 = 900;
261
262#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
263struct IdentityAnnouncementUnsigned {
264    agent_id: identity::AgentId,
265    machine_id: identity::MachineId,
266    user_id: Option<identity::UserId>,
267    agent_certificate: Option<identity::AgentCertificate>,
268    machine_public_key: Vec<u8>,
269    addresses: Vec<std::net::SocketAddr>,
270    announced_at: u64,
271    /// NAT type string (e.g. "FullCone", "Symmetric", "Unknown").
272    nat_type: Option<String>,
273    /// Whether the machine can receive direct inbound connections.
274    can_receive_direct: Option<bool>,
275    /// Whether the machine is currently relaying traffic for others.
276    is_relay: Option<bool>,
277    /// Whether the machine is coordinating NAT traversal for peers.
278    is_coordinator: Option<bool>,
279}
280
281/// Signed identity announcement broadcast by agents.
282///
283/// The outer pub/sub envelope is agent-signed (v2 message format), and this
284/// payload is machine-signed to bind the daemon's PQC key to the announcement.
285#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
286pub struct IdentityAnnouncement {
287    /// Portable agent identity.
288    pub agent_id: identity::AgentId,
289    /// Machine identity for the daemon process.
290    pub machine_id: identity::MachineId,
291    /// Optional human identity (only when explicitly consented).
292    pub user_id: Option<identity::UserId>,
293    /// Optional user->agent certificate.
294    pub agent_certificate: Option<identity::AgentCertificate>,
295    /// Machine ML-DSA-65 public key bytes.
296    pub machine_public_key: Vec<u8>,
297    /// Machine ML-DSA-65 signature over the unsigned announcement.
298    pub machine_signature: Vec<u8>,
299    /// Reachability hints.
300    pub addresses: Vec<std::net::SocketAddr>,
301    /// Unix timestamp (seconds) of announcement creation.
302    pub announced_at: u64,
303    /// NAT type as detected by the network layer (e.g. "FullCone", "Symmetric").
304    /// `None` when the network is not yet started or NAT type is undetermined.
305    pub nat_type: Option<String>,
306    /// Whether the machine can receive direct inbound connections.
307    /// `None` when the network is not yet started.
308    pub can_receive_direct: Option<bool>,
309    /// Whether the machine is currently relaying traffic for peers behind strict NATs.
310    /// `None` when the network is not yet started.
311    pub is_relay: Option<bool>,
312    /// Whether the machine is coordinating NAT traversal hole-punch timing for peers.
313    /// `None` when the network is not yet started.
314    pub is_coordinator: Option<bool>,
315}
316
317impl IdentityAnnouncement {
318    fn to_unsigned(&self) -> IdentityAnnouncementUnsigned {
319        IdentityAnnouncementUnsigned {
320            agent_id: self.agent_id,
321            machine_id: self.machine_id,
322            user_id: self.user_id,
323            agent_certificate: self.agent_certificate.clone(),
324            machine_public_key: self.machine_public_key.clone(),
325            addresses: self.addresses.clone(),
326            announced_at: self.announced_at,
327            nat_type: self.nat_type.clone(),
328            can_receive_direct: self.can_receive_direct,
329            is_relay: self.is_relay,
330            is_coordinator: self.is_coordinator,
331        }
332    }
333
334    /// Verify machine-key attestation and optional user->agent certificate.
335    pub fn verify(&self) -> error::Result<()> {
336        let machine_pub =
337            ant_quic::MlDsaPublicKey::from_bytes(&self.machine_public_key).map_err(|_| {
338                error::IdentityError::CertificateVerification(
339                    "invalid machine public key in announcement".to_string(),
340                )
341            })?;
342        let derived_machine_id = identity::MachineId::from_public_key(&machine_pub);
343        if derived_machine_id != self.machine_id {
344            return Err(error::IdentityError::CertificateVerification(
345                "machine_id does not match machine public key".to_string(),
346            ));
347        }
348
349        let unsigned_bytes = bincode::serialize(&self.to_unsigned()).map_err(|e| {
350            error::IdentityError::Serialization(format!(
351                "failed to serialize announcement for verification: {e}"
352            ))
353        })?;
354        let signature = ant_quic::crypto::raw_public_keys::pqc::MlDsaSignature::from_bytes(
355            &self.machine_signature,
356        )
357        .map_err(|e| {
358            error::IdentityError::CertificateVerification(format!(
359                "invalid machine signature in announcement: {:?}",
360                e
361            ))
362        })?;
363        ant_quic::crypto::raw_public_keys::pqc::verify_with_ml_dsa(
364            &machine_pub,
365            &unsigned_bytes,
366            &signature,
367        )
368        .map_err(|e| {
369            error::IdentityError::CertificateVerification(format!(
370                "machine signature verification failed: {:?}",
371                e
372            ))
373        })?;
374
375        match (self.user_id, self.agent_certificate.as_ref()) {
376            (Some(user_id), Some(cert)) => {
377                cert.verify()?;
378                let cert_agent_id = cert.agent_id()?;
379                if cert_agent_id != self.agent_id {
380                    return Err(error::IdentityError::CertificateVerification(
381                        "agent certificate agent_id mismatch".to_string(),
382                    ));
383                }
384                let cert_user_id = cert.user_id()?;
385                if cert_user_id != user_id {
386                    return Err(error::IdentityError::CertificateVerification(
387                        "agent certificate user_id mismatch".to_string(),
388                    ));
389                }
390                Ok(())
391            }
392            (None, None) => Ok(()),
393            _ => Err(error::IdentityError::CertificateVerification(
394                "user identity disclosure requires matching certificate".to_string(),
395            )),
396        }
397    }
398}
399
400/// Cached discovery data derived from identity announcements.
401#[derive(Debug, Clone)]
402pub struct DiscoveredAgent {
403    /// Portable agent identity.
404    pub agent_id: identity::AgentId,
405    /// Machine identity.
406    pub machine_id: identity::MachineId,
407    /// Optional human identity (when consented and attested).
408    pub user_id: Option<identity::UserId>,
409    /// Reachability hints.
410    pub addresses: Vec<std::net::SocketAddr>,
411    /// Announcement timestamp from the sender.
412    pub announced_at: u64,
413    /// Local timestamp (seconds) when this record was last updated.
414    pub last_seen: u64,
415    /// Raw ML-DSA-65 machine public key bytes from the announcement.
416    ///
417    /// Used to verify rendezvous `ProviderSummary` signatures before
418    /// trusting addresses received via the rendezvous shard topic.
419    #[doc(hidden)]
420    pub machine_public_key: Vec<u8>,
421    /// NAT type reported by this agent (e.g. "FullCone", "Symmetric", "Unknown").
422    /// `None` if the agent did not include NAT information.
423    pub nat_type: Option<String>,
424    /// Whether this agent's machine can receive direct inbound connections.
425    /// `None` if not reported.
426    pub can_receive_direct: Option<bool>,
427    /// Whether this agent's machine is acting as a relay for peers behind strict NATs.
428    /// `None` if not reported.
429    pub is_relay: Option<bool>,
430    /// Whether this agent's machine is coordinating NAT traversal timing for peers.
431    /// `None` if not reported.
432    pub is_coordinator: Option<bool>,
433}
434
435/// Builder for configuring an [`Agent`] before connecting to the network.
436///
437/// The builder allows customization of the agent's identity:
438/// - Machine key path: Where to store/load the machine keypair
439/// - Agent keypair: Import a portable agent identity from another machine
440/// - User keypair: Bind a human identity to this agent
441///
442/// # Example
443///
444/// ```ignore
445/// use x0x::Agent;
446///
447/// // Default: auto-generates both keypairs
448/// let agent = Agent::builder()
449///     .build()
450///     .await?;
451///
452/// // Custom machine key path
453/// let agent = Agent::builder()
454///     .with_machine_key("/custom/path/machine.key")
455///     .build()
456///     .await?;
457///
458/// // Import agent keypair
459/// let agent_kp = load_agent_keypair()?;
460/// let agent = Agent::builder()
461///     .with_agent_key(agent_kp)
462///     .build()
463///     .await?;
464///
465/// // With user identity (three-layer)
466/// let agent = Agent::builder()
467///     .with_user_key_path("~/.x0x/user.key")
468///     .build()
469///     .await?;
470/// ```
471#[derive(Debug)]
472pub struct AgentBuilder {
473    machine_key_path: Option<std::path::PathBuf>,
474    agent_keypair: Option<identity::AgentKeypair>,
475    agent_key_path: Option<std::path::PathBuf>,
476    user_keypair: Option<identity::UserKeypair>,
477    user_key_path: Option<std::path::PathBuf>,
478    #[allow(dead_code)]
479    network_config: Option<network::NetworkConfig>,
480    peer_cache_dir: Option<std::path::PathBuf>,
481    heartbeat_interval_secs: Option<u64>,
482    identity_ttl_secs: Option<u64>,
483    /// Custom path for the contacts file.
484    contact_store_path: Option<std::path::PathBuf>,
485}
486
487/// Context captured by the background identity heartbeat task.
488struct HeartbeatContext {
489    identity: std::sync::Arc<identity::Identity>,
490    runtime: std::sync::Arc<gossip::GossipRuntime>,
491    network: std::sync::Arc<network::NetworkNode>,
492    interval_secs: u64,
493    cache: std::sync::Arc<
494        tokio::sync::RwLock<std::collections::HashMap<identity::AgentId, DiscoveredAgent>>,
495    >,
496}
497
498impl HeartbeatContext {
499    async fn announce(&self) -> error::Result<()> {
500        let machine_public_key = self
501            .identity
502            .machine_keypair()
503            .public_key()
504            .as_bytes()
505            .to_vec();
506        let announced_at = Agent::unix_timestamp_secs();
507
508        // Include ALL routable addresses (IPv4 and IPv6) so other agents
509        // can connect to us via whichever protocol they support.
510        let mut addresses = match self.network.node_status().await {
511            Some(status) if !status.external_addrs.is_empty() => status.external_addrs,
512            _ => match self.network.routable_addr().await {
513                Some(addr) => vec![addr],
514                None => Vec::new(),
515            },
516        };
517
518        // Detect global IPv6 address locally (ant-quic currently only
519        // reports IPv4 via OBSERVED_ADDRESS). Uses UDP connect trick —
520        // no data is sent, the OS routing table resolves our source addr.
521        let port = addresses.first().map(|a| a.port()).unwrap_or(5483);
522        if let Ok(sock) = std::net::UdpSocket::bind("[::]:0") {
523            if sock.connect("[2001:4860:4860::8888]:80").is_ok() {
524                if let Ok(local) = sock.local_addr() {
525                    if let std::net::IpAddr::V6(v6) = local.ip() {
526                        let segs = v6.segments();
527                        let is_global = (segs[0] & 0xffc0) != 0xfe80
528                            && (segs[0] & 0xff00) != 0xfd00
529                            && !v6.is_loopback();
530                        if is_global {
531                            let v6_addr = std::net::SocketAddr::new(std::net::IpAddr::V6(v6), port);
532                            if !addresses.contains(&v6_addr) {
533                                addresses.push(v6_addr);
534                            }
535                        }
536                    }
537                }
538            }
539        }
540
541        // Query NAT and relay status from the network layer.
542        let (nat_type, can_receive_direct, is_relay, is_coordinator) =
543            match self.network.node_status().await {
544                Some(status) => (
545                    Some(status.nat_type.to_string()),
546                    Some(status.can_receive_direct),
547                    Some(status.is_relaying),
548                    Some(status.is_coordinating),
549                ),
550                None => (None, None, None, None),
551            };
552
553        let unsigned = IdentityAnnouncementUnsigned {
554            agent_id: self.identity.agent_id(),
555            machine_id: self.identity.machine_id(),
556            user_id: self
557                .identity
558                .user_keypair()
559                .map(identity::UserKeypair::user_id),
560            agent_certificate: self.identity.agent_certificate().cloned(),
561            machine_public_key: machine_public_key.clone(),
562            addresses,
563            announced_at,
564            nat_type: nat_type.clone(),
565            can_receive_direct,
566            is_relay,
567            is_coordinator,
568        };
569        let unsigned_bytes = bincode::serialize(&unsigned).map_err(|e| {
570            error::IdentityError::Serialization(format!(
571                "heartbeat: failed to serialize announcement: {e}"
572            ))
573        })?;
574        let machine_signature = ant_quic::crypto::raw_public_keys::pqc::sign_with_ml_dsa(
575            self.identity.machine_keypair().secret_key(),
576            &unsigned_bytes,
577        )
578        .map_err(|e| {
579            error::IdentityError::Storage(std::io::Error::other(format!(
580                "heartbeat: failed to sign announcement: {:?}",
581                e
582            )))
583        })?
584        .as_bytes()
585        .to_vec();
586
587        let announcement = IdentityAnnouncement {
588            agent_id: unsigned.agent_id,
589            machine_id: unsigned.machine_id,
590            user_id: unsigned.user_id,
591            agent_certificate: unsigned.agent_certificate,
592            machine_public_key: machine_public_key.clone(),
593            machine_signature,
594            addresses: unsigned.addresses,
595            announced_at,
596            nat_type,
597            can_receive_direct,
598            is_relay,
599            is_coordinator,
600        };
601        let encoded = bincode::serialize(&announcement).map_err(|e| {
602            error::IdentityError::Serialization(format!(
603                "heartbeat: failed to serialize announcement: {e}"
604            ))
605        })?;
606        self.runtime
607            .pubsub()
608            .publish(
609                IDENTITY_ANNOUNCE_TOPIC.to_string(),
610                bytes::Bytes::from(encoded),
611            )
612            .await
613            .map_err(|e| {
614                error::IdentityError::Storage(std::io::Error::other(format!(
615                    "heartbeat: publish failed: {e}"
616                )))
617            })?;
618        let now = Agent::unix_timestamp_secs();
619        self.cache.write().await.insert(
620            announcement.agent_id,
621            DiscoveredAgent {
622                agent_id: announcement.agent_id,
623                machine_id: announcement.machine_id,
624                user_id: announcement.user_id,
625                addresses: announcement.addresses,
626                announced_at: announcement.announced_at,
627                last_seen: now,
628                machine_public_key: machine_public_key.clone(),
629                nat_type: announcement.nat_type.clone(),
630                can_receive_direct: announcement.can_receive_direct,
631                is_relay: announcement.is_relay,
632                is_coordinator: announcement.is_coordinator,
633            },
634        );
635        Ok(())
636    }
637}
638
639impl Agent {
640    /// Create a new agent with default configuration.
641    ///
642    /// This generates a fresh identity with both machine and agent keypairs.
643    /// The machine keypair is stored persistently in `~/.x0x/machine.key`.
644    ///
645    /// For more control, use [`Agent::builder()`].
646    pub async fn new() -> error::Result<Self> {
647        Agent::builder().build().await
648    }
649
650    /// Create an [`AgentBuilder`] for fine-grained configuration.
651    ///
652    /// The builder supports:
653    /// - Custom machine key path via `with_machine_key()`
654    /// - Imported agent keypair via `with_agent_key()`
655    /// - User identity via `with_user_key()` or `with_user_key_path()`
656    pub fn builder() -> AgentBuilder {
657        AgentBuilder {
658            machine_key_path: None,
659            agent_keypair: None,
660            agent_key_path: None,
661            user_keypair: None,
662            user_key_path: None,
663            network_config: None,
664            peer_cache_dir: None,
665            heartbeat_interval_secs: None,
666            identity_ttl_secs: None,
667            contact_store_path: None,
668        }
669    }
670
671    /// Get the agent's identity.
672    ///
673    /// # Returns
674    ///
675    /// A reference to the agent's [`identity::Identity`].
676    #[inline]
677    #[must_use]
678    pub fn identity(&self) -> &identity::Identity {
679        &self.identity
680    }
681
682    /// Get the machine ID for this agent.
683    ///
684    /// The machine ID is tied to this computer and used for QUIC transport
685    /// authentication. It is stored persistently in `~/.x0x/machine.key`.
686    ///
687    /// # Returns
688    ///
689    /// The agent's machine ID.
690    #[inline]
691    #[must_use]
692    pub fn machine_id(&self) -> identity::MachineId {
693        self.identity.machine_id()
694    }
695
696    /// Get the agent ID for this agent.
697    ///
698    /// The agent ID is portable across machines and represents the agent's
699    /// persistent identity. It can be exported and imported to run the same
700    /// agent on different computers.
701    ///
702    /// # Returns
703    ///
704    /// The agent's ID.
705    #[inline]
706    #[must_use]
707    pub fn agent_id(&self) -> identity::AgentId {
708        self.identity.agent_id()
709    }
710
711    /// Get the user ID for this agent, if a user identity is bound.
712    ///
713    /// Returns `None` if no user keypair was provided during construction.
714    /// User keys are opt-in — they are never auto-generated.
715    #[inline]
716    #[must_use]
717    pub fn user_id(&self) -> Option<identity::UserId> {
718        self.identity.user_id()
719    }
720
721    /// Get the agent certificate, if one exists.
722    ///
723    /// The certificate cryptographically binds this agent to a user identity.
724    #[inline]
725    #[must_use]
726    pub fn agent_certificate(&self) -> Option<&identity::AgentCertificate> {
727        self.identity.agent_certificate()
728    }
729
730    /// Get the network node, if initialized.
731    #[must_use]
732    pub fn network(&self) -> Option<&std::sync::Arc<network::NetworkNode>> {
733        self.network.as_ref()
734    }
735
736    /// Get the gossip cache adapter for coordinator discovery.
737    ///
738    /// Returns `None` if this agent was built without a network config.
739    /// The adapter wraps the same `Arc<BootstrapCache>` as the network node.
740    pub fn gossip_cache_adapter(&self) -> Option<&saorsa_gossip_coordinator::GossipCacheAdapter> {
741        self.gossip_cache_adapter.as_ref()
742    }
743
744    /// Get the presence system wrapper, if configured.
745    ///
746    /// Returns `None` if this agent was built without a network config.
747    /// The presence wrapper provides beacon broadcasting, FOAF discovery,
748    /// and online/offline event subscriptions.
749    #[must_use]
750    pub fn presence_system(&self) -> Option<&std::sync::Arc<presence::PresenceWrapper>> {
751        self.presence.as_ref()
752    }
753
754    /// Get a reference to the contact store.
755    ///
756    /// The contact store persists trust levels and machine records for known
757    /// agents. It is backed by `~/.x0x/contacts.json` by default.
758    ///
759    /// Use [`with_contact_store_path`](AgentBuilder::with_contact_store_path)
760    /// on the builder to customise the path.
761    #[must_use]
762    pub fn contacts(&self) -> &std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>> {
763        &self.contact_store
764    }
765
766    /// Get the reachability information for a discovered agent.
767    ///
768    /// Returns `None` if the agent is not in the discovery cache.
769    /// Use [`Agent::announce_identity`] or wait for a heartbeat announcement
770    /// to populate the cache.
771    pub async fn reachability(
772        &self,
773        agent_id: &identity::AgentId,
774    ) -> Option<connectivity::ReachabilityInfo> {
775        let cache = self.identity_discovery_cache.read().await;
776        cache
777            .get(agent_id)
778            .map(connectivity::ReachabilityInfo::from_discovered)
779    }
780
781    /// Attempt to connect to an agent by its identity.
782    ///
783    /// Looks up the agent in the discovery cache, then tries to establish
784    /// a QUIC connection using the best available strategy:
785    ///
786    /// 1. **Direct** — if the agent reports `can_receive_direct: true` or
787    ///    has a traversable NAT type, try each known address in order.
788    /// 2. **Coordinated** — if direct fails or the agent reports a symmetric
789    ///    NAT, the outcome is `Coordinated` if any address was reachable via
790    ///    the network layer's NAT traversal.
791    /// 3. **Unreachable** — no address succeeded.
792    /// 4. **NotFound** — the agent is not in the discovery cache.
793    ///
794    /// # Errors
795    ///
796    /// Returns an error only for internal failures (e.g. network not started).
797    /// Connectivity failures are reported as `ConnectOutcome::Unreachable`.
798    pub async fn connect_to_agent(
799        &self,
800        agent_id: &identity::AgentId,
801    ) -> error::Result<connectivity::ConnectOutcome> {
802        // 1. Look up in discovery cache
803        let discovered = {
804            let cache = self.identity_discovery_cache.read().await;
805            cache.get(agent_id).cloned()
806        };
807
808        let agent = match discovered {
809            Some(a) => a,
810            None => return Ok(connectivity::ConnectOutcome::NotFound),
811        };
812
813        let info = connectivity::ReachabilityInfo::from_discovered(&agent);
814
815        if info.addresses.is_empty() {
816            return Ok(connectivity::ConnectOutcome::Unreachable);
817        }
818
819        let Some(ref network) = self.network else {
820            return Ok(connectivity::ConnectOutcome::Unreachable);
821        };
822
823        // 2. If already connected via gossip, reuse that connection
824        let machine_peer_id = ant_quic::PeerId(agent.machine_id.0);
825        if network.is_connected(&machine_peer_id).await {
826            self.direct_messaging
827                .mark_connected(agent.agent_id, agent.machine_id)
828                .await;
829            // Return the first address as the connected address
830            if let Some(addr) = info.addresses.first() {
831                return Ok(connectivity::ConnectOutcome::Direct(*addr));
832            }
833        }
834
835        // 3. Try direct connection if likely to succeed
836        if info.likely_direct() {
837            for addr in &info.addresses {
838                match network.connect_addr(*addr).await {
839                    Ok(connected_peer_id) => {
840                        // Use the real PeerId from the QUIC handshake (may differ
841                        // from a zeroed placeholder in the discovery cache).
842                        let real_machine_id = identity::MachineId(connected_peer_id.0);
843                        // Enrich bootstrap cache with this successful address
844                        if let Some(ref bc) = self.bootstrap_cache {
845                            bc.add_from_connection(connected_peer_id, vec![*addr], None)
846                                .await;
847                        }
848                        // Update discovery cache with real machine_id
849                        {
850                            let mut cache = self.identity_discovery_cache.write().await;
851                            if let Some(entry) = cache.get_mut(agent_id) {
852                                entry.machine_id = real_machine_id;
853                            }
854                        }
855                        // Register agent mapping for direct messaging
856                        self.direct_messaging
857                            .mark_connected(agent.agent_id, real_machine_id)
858                            .await;
859                        return Ok(connectivity::ConnectOutcome::Direct(*addr));
860                    }
861                    Err(e) => {
862                        tracing::debug!("Direct connect to {} failed: {}", addr, e);
863                    }
864                }
865            }
866        }
867
868        // 3. If direct failed and coordination may help, ensure a coordinator is
869        //    connected first (gives ant-quic a relay path), then retry addresses.
870        //    The network layer handles NAT traversal internally via QUIC extension frames.
871        if info.needs_coordination() || !info.likely_direct() {
872            // Ensure we're connected to a reachable peer that can act as a
873            // coordinator/relay for NAT hole-punching. Any peer with
874            // can_receive_direct serves as a potential mutual peer for
875            // ant-quic's PUNCH_ME_NOW coordination.
876            {
877                let cache = self.identity_discovery_cache.read().await;
878                let reachable: Vec<std::net::SocketAddr> = cache
879                    .values()
880                    .filter(|a| a.can_receive_direct == Some(true))
881                    .flat_map(|a| a.addresses.clone())
882                    .take(6)
883                    .collect();
884                drop(cache);
885                for addr in &reachable {
886                    if network.connect_addr(*addr).await.is_ok() {
887                        tracing::debug!(
888                            addr = %addr,
889                            "Connected to reachable peer for NAT coordination"
890                        );
891                        break;
892                    }
893                }
894            }
895
896            for addr in &info.addresses {
897                match network.connect_addr(*addr).await {
898                    Ok(connected_peer_id) => {
899                        let real_machine_id = identity::MachineId(connected_peer_id.0);
900                        if let Some(ref bc) = self.bootstrap_cache {
901                            bc.add_from_connection(connected_peer_id, vec![*addr], None)
902                                .await;
903                        }
904                        // Update discovery cache with real machine_id
905                        {
906                            let mut cache = self.identity_discovery_cache.write().await;
907                            if let Some(entry) = cache.get_mut(agent_id) {
908                                entry.machine_id = real_machine_id;
909                            }
910                        }
911                        // Register agent mapping for direct messaging
912                        self.direct_messaging
913                            .mark_connected(agent.agent_id, real_machine_id)
914                            .await;
915                        return Ok(connectivity::ConnectOutcome::Coordinated(*addr));
916                    }
917                    Err(e) => {
918                        tracing::debug!("Coordinated connect to {} failed: {}", addr, e);
919                    }
920                }
921            }
922        }
923
924        Ok(connectivity::ConnectOutcome::Unreachable)
925    }
926
927    /// Save the bootstrap cache and release resources.
928    ///
929    /// Call this before dropping the agent to ensure the peer cache is
930    /// persisted to disk. The background maintenance task saves periodically,
931    /// but this guarantees a final save.
932    pub async fn shutdown(&self) {
933        // Shut down presence beacons first.
934        if let Some(ref pw) = self.presence {
935            pw.shutdown().await;
936            tracing::info!("Presence system shut down");
937        }
938
939        if let Some(ref cache) = self.bootstrap_cache {
940            if let Err(e) = cache.save().await {
941                tracing::warn!("Failed to save bootstrap cache on shutdown: {e}");
942            } else {
943                tracing::info!("Bootstrap cache saved on shutdown");
944            }
945        }
946    }
947
948    // === Direct Messaging ===
949
950    /// Send data directly to a connected agent.
951    ///
952    /// This bypasses gossip pub/sub for efficient point-to-point communication.
953    /// The agent must be connected first via [`Self::connect_to_agent`].
954    ///
955    /// # Arguments
956    ///
957    /// * `agent_id` - The target agent's identifier.
958    /// * `payload` - The data to send.
959    ///
960    /// # Errors
961    ///
962    /// Returns an error if:
963    /// - Network is not initialized
964    /// - Agent is not connected
965    /// - Agent is not found in discovery cache
966    /// - Send fails
967    ///
968    /// # Example
969    ///
970    /// ```rust,ignore
971    /// // First connect to the agent
972    /// let outcome = agent.connect_to_agent(&target_agent_id).await?;
973    ///
974    /// // Then send data directly
975    /// agent.send_direct(&target_agent_id, b"hello".to_vec()).await?;
976    /// ```
977    pub async fn send_direct(
978        &self,
979        agent_id: &identity::AgentId,
980        payload: Vec<u8>,
981    ) -> error::NetworkResult<()> {
982        let network = self.network.as_ref().ok_or_else(|| {
983            error::NetworkError::NodeCreation("network not initialized".to_string())
984        })?;
985
986        // Look up machine_id from discovery cache, falling back to DirectMessaging registry
987        let cached_machine_id = {
988            let cache = self.identity_discovery_cache.read().await;
989            cache
990                .get(agent_id)
991                .map(|d| d.machine_id)
992                .filter(|m| m.0 != [0u8; 32]) // Ignore placeholder zeroed IDs
993        };
994        let machine_id = match cached_machine_id {
995            Some(id) => id,
996            None => {
997                // Fallback: check DirectMessaging agent→machine registry
998                // (populated when we receive direct messages or after connect_to_agent)
999                match self.direct_messaging.get_machine_id(agent_id).await {
1000                    Some(id) => id,
1001                    None => {
1002                        // Last resort: try connect_to_agent which may discover the
1003                        // machine_id via QUIC handshake and update the cache.
1004                        let _ = self.connect_to_agent(agent_id).await;
1005                        self.direct_messaging
1006                            .get_machine_id(agent_id)
1007                            .await
1008                            .ok_or(error::NetworkError::AgentNotFound(agent_id.0))?
1009                    }
1010                }
1011            }
1012        };
1013
1014        // Check if connected
1015        let ant_peer_id = ant_quic::PeerId(machine_id.0);
1016        if !network.is_connected(&ant_peer_id).await {
1017            return Err(error::NetworkError::AgentNotConnected(agent_id.0));
1018        }
1019
1020        // Send via network layer
1021        network
1022            .send_direct(&ant_peer_id, &self.identity.agent_id().0, &payload)
1023            .await?;
1024
1025        tracing::info!(
1026            "Sent {} bytes directly to agent {:?}",
1027            payload.len(),
1028            agent_id
1029        );
1030
1031        Ok(())
1032    }
1033
1034    /// Receive the next direct message from any connected agent.
1035    ///
1036    /// Blocks until a direct message is received.
1037    ///
1038    /// # Security Note
1039    ///
1040    /// This method does **not** apply trust filtering from `ContactStore`.
1041    /// Messages from blocked agents will still be delivered. Use
1042    /// [`recv_direct_filtered()`](Self::recv_direct_filtered) if you need
1043    /// trust-based filtering.
1044    ///
1045    /// # Returns
1046    ///
1047    /// The received [`DirectMessage`] containing sender, payload, and timestamp.
1048    ///
1049    /// # Example
1050    ///
1051    /// ```rust,ignore
1052    /// loop {
1053    ///     if let Some(msg) = agent.recv_direct().await {
1054    ///         println!("From {:?}: {:?}", msg.sender, msg.payload_str());
1055    ///     }
1056    /// }
1057    /// ```
1058    pub async fn recv_direct(&self) -> Option<direct::DirectMessage> {
1059        self.recv_direct_inner().await
1060    }
1061
1062    /// Receive the next direct message, filtering by trust level.
1063    ///
1064    /// Messages from blocked agents are silently dropped. This mirrors the
1065    /// behavior of gossip pub/sub message filtering.
1066    ///
1067    /// # Returns
1068    ///
1069    /// The received [`DirectMessage`], or `None` if the channel closes.
1070    /// Messages from blocked senders are dropped and the method continues
1071    /// waiting for the next acceptable message.
1072    ///
1073    /// # Example
1074    ///
1075    /// ```rust,ignore
1076    /// // Block an agent
1077    /// {
1078    ///     let mut contacts = agent.contacts().write().await;
1079    ///     contacts.set_trust(&bad_agent_id, TrustLevel::Blocked);
1080    /// }
1081    ///
1082    /// // Messages from blocked agents are silently dropped
1083    /// loop {
1084    ///     if let Some(msg) = agent.recv_direct_filtered().await {
1085    ///         // msg.sender is not in the blocked list
1086    ///         // (note: sender is self-asserted, see DirectMessage docs)
1087    ///     }
1088    /// }
1089    /// ```
1090    pub async fn recv_direct_filtered(&self) -> Option<direct::DirectMessage> {
1091        loop {
1092            let msg = self.recv_direct_inner().await?;
1093
1094            // Check trust level
1095            let contacts = self.contact_store.read().await;
1096            if let Some(contact) = contacts.get(&msg.sender) {
1097                if contact.trust_level == contacts::TrustLevel::Blocked {
1098                    tracing::debug!(
1099                        "Dropping direct message from blocked agent {:?}",
1100                        msg.sender
1101                    );
1102                    continue;
1103                }
1104            }
1105
1106            return Some(msg);
1107        }
1108    }
1109
1110    /// Internal helper for receiving direct messages.
1111    ///
1112    /// Reads from the `DirectMessaging` internal channel, which is fed by
1113    /// the background `start_direct_listener` task. This ensures there is
1114    /// only ONE consumer of `network.recv_direct()` (the listener), avoiding
1115    /// message-stealing races.
1116    async fn recv_direct_inner(&self) -> Option<direct::DirectMessage> {
1117        self.direct_messaging.recv().await
1118    }
1119
1120    /// Subscribe to direct messages.
1121    ///
1122    /// Returns a receiver that can be cloned for multiple consumers.
1123    /// Messages are broadcast to all receivers.
1124    ///
1125    /// # Example
1126    ///
1127    /// ```rust,ignore
1128    /// let mut rx = agent.subscribe_direct();
1129    /// tokio::spawn(async move {
1130    ///     while let Some(msg) = rx.recv().await {
1131    ///         println!("Direct message: {:?}", msg);
1132    ///     }
1133    /// });
1134    /// ```
1135    pub fn subscribe_direct(&self) -> direct::DirectMessageReceiver {
1136        self.direct_messaging.subscribe()
1137    }
1138
1139    /// Get the direct messaging infrastructure.
1140    ///
1141    /// Provides low-level access to connection tracking and agent mappings.
1142    pub fn direct_messaging(&self) -> &std::sync::Arc<direct::DirectMessaging> {
1143        &self.direct_messaging
1144    }
1145
1146    /// Check if an agent is currently connected for direct messaging.
1147    ///
1148    /// # Arguments
1149    ///
1150    /// * `agent_id` - The agent to check.
1151    ///
1152    /// # Returns
1153    ///
1154    /// `true` if a QUIC connection exists to this agent's machine.
1155    pub async fn is_agent_connected(&self, agent_id: &identity::AgentId) -> bool {
1156        let Some(network) = &self.network else {
1157            return false;
1158        };
1159
1160        // Look up machine_id from discovery cache
1161        let machine_id = {
1162            let cache = self.identity_discovery_cache.read().await;
1163            cache.get(agent_id).map(|d| d.machine_id)
1164        };
1165
1166        match machine_id {
1167            Some(mid) => {
1168                let ant_peer_id = ant_quic::PeerId(mid.0);
1169                network.is_connected(&ant_peer_id).await
1170            }
1171            None => false,
1172        }
1173    }
1174
1175    /// Get list of currently connected agents.
1176    ///
1177    /// Returns agents that have been discovered and are currently connected
1178    /// via QUIC transport.
1179    pub async fn connected_agents(&self) -> Vec<identity::AgentId> {
1180        let Some(network) = &self.network else {
1181            return Vec::new();
1182        };
1183
1184        let connected_peers = network.connected_peers().await;
1185        let cache = self.identity_discovery_cache.read().await;
1186
1187        // Find agents whose machine_id matches a connected peer
1188        cache
1189            .values()
1190            .filter(|agent| {
1191                let ant_peer_id = ant_quic::PeerId(agent.machine_id.0);
1192                connected_peers.contains(&ant_peer_id)
1193            })
1194            .map(|agent| agent.agent_id)
1195            .collect()
1196    }
1197
1198    /// Attach a contact store for trust-based message filtering.
1199    ///
1200    /// When set, the gossip pub/sub layer will:
1201    /// - Drop messages from `Blocked` senders (don't deliver, don't rebroadcast)
1202    /// - Annotate messages with the sender's trust level for consumers
1203    ///
1204    /// Without a contact store, all messages pass through (open relay mode).
1205    pub fn set_contacts(&self, store: std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>>) {
1206        if let Some(runtime) = &self.gossip_runtime {
1207            runtime.pubsub().set_contacts(store);
1208        }
1209    }
1210
1211    /// Announce this agent's identity on the network discovery topic.
1212    ///
1213    /// By default, announcements include agent + machine identity only.
1214    /// Human identity disclosure is opt-in and requires explicit consent.
1215    ///
1216    /// # Arguments
1217    ///
1218    /// * `include_user_identity` - Whether to include `user_id` and certificate
1219    /// * `human_consent` - Must be `true` when disclosing user identity
1220    ///
1221    /// # Errors
1222    ///
1223    /// Returns an error if:
1224    /// - Gossip runtime is not initialized
1225    /// - Human identity disclosure is requested without explicit consent
1226    /// - Human identity disclosure is requested but no user identity is configured
1227    /// - Serialization or publish fails
1228    pub async fn announce_identity(
1229        &self,
1230        include_user_identity: bool,
1231        human_consent: bool,
1232    ) -> error::Result<()> {
1233        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1234            error::IdentityError::Storage(std::io::Error::other(
1235                "gossip runtime not initialized - configure agent with network first",
1236            ))
1237        })?;
1238
1239        self.start_identity_listener().await?;
1240
1241        // Include ALL routable addresses (IPv4 and IPv6).
1242        let mut addresses = if let Some(network) = self.network.as_ref() {
1243            match network.node_status().await {
1244                Some(status) if !status.external_addrs.is_empty() => status.external_addrs,
1245                _ => match network.routable_addr().await {
1246                    Some(addr) => vec![addr],
1247                    None => self.announcement_addresses(),
1248                },
1249            }
1250        } else {
1251            self.announcement_addresses()
1252        };
1253        // Detect addresses locally via UDP socket tricks.
1254        // ant-quic discovers public IPv4 via OBSERVED_ADDRESS from peers.
1255        // IPv6 is globally routable (no NAT), so we probe locally.
1256        let port = addresses.first().map(|a| a.port()).unwrap_or(5483);
1257
1258        // IPv6 probe
1259        if let Ok(sock) = std::net::UdpSocket::bind("[::]:0") {
1260            if sock.connect("[2001:4860:4860::8888]:80").is_ok() {
1261                if let Ok(local) = sock.local_addr() {
1262                    if let std::net::IpAddr::V6(v6) = local.ip() {
1263                        let segs = v6.segments();
1264                        let is_global = (segs[0] & 0xffc0) != 0xfe80
1265                            && (segs[0] & 0xff00) != 0xfd00
1266                            && !v6.is_loopback();
1267                        if is_global {
1268                            let v6_addr = std::net::SocketAddr::new(std::net::IpAddr::V6(v6), port);
1269                            if !addresses.contains(&v6_addr) {
1270                                addresses.push(v6_addr);
1271                            }
1272                        }
1273                    }
1274                }
1275            }
1276        }
1277        let announcement = self.build_identity_announcement_with_addrs(
1278            include_user_identity,
1279            human_consent,
1280            addresses,
1281        )?;
1282
1283        let encoded = bincode::serialize(&announcement).map_err(|e| {
1284            error::IdentityError::Serialization(format!(
1285                "failed to serialize identity announcement: {e}"
1286            ))
1287        })?;
1288
1289        let payload = bytes::Bytes::from(encoded);
1290
1291        // Publish to shard topic first (future-proof routing).
1292        let shard_topic = shard_topic_for_agent(&announcement.agent_id);
1293        runtime
1294            .pubsub()
1295            .publish(shard_topic, payload.clone())
1296            .await
1297            .map_err(|e| {
1298                error::IdentityError::Storage(std::io::Error::other(format!(
1299                    "failed to publish identity announcement to shard topic: {e}"
1300                )))
1301            })?;
1302
1303        // Also publish to legacy broadcast topic for backward compatibility.
1304        runtime
1305            .pubsub()
1306            .publish(IDENTITY_ANNOUNCE_TOPIC.to_string(), payload)
1307            .await
1308            .map_err(|e| {
1309                error::IdentityError::Storage(std::io::Error::other(format!(
1310                    "failed to publish identity announcement: {e}"
1311                )))
1312            })?;
1313
1314        let now = Self::unix_timestamp_secs();
1315        self.identity_discovery_cache.write().await.insert(
1316            announcement.agent_id,
1317            DiscoveredAgent {
1318                agent_id: announcement.agent_id,
1319                machine_id: announcement.machine_id,
1320                user_id: announcement.user_id,
1321                addresses: announcement.addresses.clone(),
1322                announced_at: announcement.announced_at,
1323                last_seen: now,
1324                machine_public_key: announcement.machine_public_key.clone(),
1325                nat_type: announcement.nat_type.clone(),
1326                can_receive_direct: announcement.can_receive_direct,
1327                is_relay: announcement.is_relay,
1328                is_coordinator: announcement.is_coordinator,
1329            },
1330        );
1331
1332        Ok(())
1333    }
1334
1335    /// Get all discovered agents from identity announcements.
1336    ///
1337    /// # Errors
1338    ///
1339    /// Returns an error if the gossip runtime is not initialized.
1340    pub async fn discovered_agents(&self) -> error::Result<Vec<DiscoveredAgent>> {
1341        self.start_identity_listener().await?;
1342        let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
1343        let mut agents: Vec<_> = self
1344            .identity_discovery_cache
1345            .read()
1346            .await
1347            .values()
1348            .filter(|a| a.announced_at >= cutoff)
1349            .cloned()
1350            .collect();
1351        agents.sort_by(|a, b| a.agent_id.0.cmp(&b.agent_id.0));
1352        Ok(agents)
1353    }
1354
1355    /// Return all discovered agents regardless of TTL.
1356    ///
1357    /// Unlike [`Self::discovered_agents`], this method skips TTL filtering and
1358    /// returns all cache entries, including stale ones. Useful for debugging.
1359    ///
1360    /// # Errors
1361    ///
1362    /// Returns an error if the gossip runtime is not initialized.
1363    pub async fn discovered_agents_unfiltered(&self) -> error::Result<Vec<DiscoveredAgent>> {
1364        self.start_identity_listener().await?;
1365        let mut agents: Vec<_> = self
1366            .identity_discovery_cache
1367            .read()
1368            .await
1369            .values()
1370            .cloned()
1371            .collect();
1372        agents.sort_by(|a, b| a.agent_id.0.cmp(&b.agent_id.0));
1373        Ok(agents)
1374    }
1375
1376    /// Get one discovered agent record by agent ID.
1377    ///
1378    /// # Errors
1379    ///
1380    /// Returns an error if the gossip runtime is not initialized.
1381    pub async fn discovered_agent(
1382        &self,
1383        agent_id: identity::AgentId,
1384    ) -> error::Result<Option<DiscoveredAgent>> {
1385        self.start_identity_listener().await?;
1386        Ok(self
1387            .identity_discovery_cache
1388            .read()
1389            .await
1390            .get(&agent_id)
1391            .cloned())
1392    }
1393
1394    async fn start_identity_listener(&self) -> error::Result<()> {
1395        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1396            error::IdentityError::Storage(std::io::Error::other(
1397                "gossip runtime not initialized - configure agent with network first",
1398            ))
1399        })?;
1400
1401        if self
1402            .identity_listener_started
1403            .swap(true, std::sync::atomic::Ordering::AcqRel)
1404        {
1405            return Ok(());
1406        }
1407
1408        let mut sub_legacy = runtime
1409            .pubsub()
1410            .subscribe(IDENTITY_ANNOUNCE_TOPIC.to_string())
1411            .await;
1412        let own_shard_topic = shard_topic_for_agent(&self.agent_id());
1413        let mut sub_shard = runtime.pubsub().subscribe(own_shard_topic).await;
1414        let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
1415        let bootstrap_cache = self.bootstrap_cache.clone();
1416        let contact_store = std::sync::Arc::clone(&self.contact_store);
1417        let network = self.network.as_ref().map(std::sync::Arc::clone);
1418        let own_agent_id = self.agent_id();
1419
1420        tokio::spawn(async move {
1421            // Track agents we've already initiated auto-connect to, preventing
1422            // duplicate connection attempts from concurrent announcements.
1423            let mut auto_connect_attempted = std::collections::HashSet::<identity::AgentId>::new();
1424
1425            loop {
1426                // Drain whichever subscription fires next; deduplicate by AgentId in cache.
1427                let msg = tokio::select! {
1428                    Some(m) = sub_legacy.recv() => m,
1429                    Some(m) = sub_shard.recv() => m,
1430                    else => break,
1431                };
1432                let decoded = {
1433                    use bincode::Options;
1434                    bincode::options()
1435                        .with_fixint_encoding()
1436                        .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
1437                        .allow_trailing_bytes()
1438                        .deserialize::<IdentityAnnouncement>(&msg.payload)
1439                };
1440                let announcement = match decoded {
1441                    Ok(a) => a,
1442                    Err(e) => {
1443                        tracing::debug!("Ignoring invalid identity announcement payload: {}", e);
1444                        continue;
1445                    }
1446                };
1447
1448                if let Err(e) = announcement.verify() {
1449                    tracing::warn!("Ignoring unverifiable identity announcement: {}", e);
1450                    continue;
1451                }
1452
1453                // Evaluate trust for this (agent, machine) pair.
1454                // Blocked or machine-pinning violations are silently dropped.
1455                {
1456                    let store = contact_store.read().await;
1457                    let evaluator = trust::TrustEvaluator::new(&store);
1458                    let decision = evaluator.evaluate(&trust::TrustContext {
1459                        agent_id: &announcement.agent_id,
1460                        machine_id: &announcement.machine_id,
1461                    });
1462                    match decision {
1463                        trust::TrustDecision::RejectBlocked => {
1464                            tracing::debug!(
1465                                "Dropping identity announcement from blocked agent {:?}",
1466                                hex::encode(&announcement.agent_id.0[..8]),
1467                            );
1468                            continue;
1469                        }
1470                        trust::TrustDecision::RejectMachineMismatch => {
1471                            tracing::warn!(
1472                                "Dropping identity announcement from agent {:?}: machine {:?} not in pinned list",
1473                                hex::encode(&announcement.agent_id.0[..8]),
1474                                hex::encode(&announcement.machine_id.0[..8]),
1475                            );
1476                            continue;
1477                        }
1478                        _ => {}
1479                    }
1480                }
1481
1482                // Update machine records in the contact store.
1483                {
1484                    let mut store = contact_store.write().await;
1485                    let record = contacts::MachineRecord::new(announcement.machine_id, None);
1486                    store.add_machine(&announcement.agent_id, record);
1487                }
1488
1489                let now = std::time::SystemTime::now()
1490                    .duration_since(std::time::UNIX_EPOCH)
1491                    .map_or(0, |d| d.as_secs());
1492
1493                // Add announced addresses to the bootstrap cache so auto-dial
1494                // can connect to peers discovered via gossip announcements.
1495                // After identity unification, machine_id == ant-quic PeerId.
1496                if !announcement.addresses.is_empty() {
1497                    if let Some(ref bc) = &bootstrap_cache {
1498                        let peer_id = ant_quic::PeerId(announcement.machine_id.0);
1499                        bc.add_from_connection(peer_id, announcement.addresses.clone(), None)
1500                            .await;
1501                        tracing::debug!(
1502                            "Added {} addresses from identity announcement to bootstrap cache for agent {:?} (machine {:?})",
1503                            announcement.addresses.len(),
1504                            announcement.agent_id,
1505                            hex::encode(&announcement.machine_id.0[..8]),
1506                        );
1507                    }
1508                }
1509
1510                cache.write().await.insert(
1511                    announcement.agent_id,
1512                    DiscoveredAgent {
1513                        agent_id: announcement.agent_id,
1514                        machine_id: announcement.machine_id,
1515                        user_id: announcement.user_id,
1516                        addresses: announcement.addresses.clone(),
1517                        announced_at: announcement.announced_at,
1518                        last_seen: now,
1519                        machine_public_key: announcement.machine_public_key.clone(),
1520                        nat_type: announcement.nat_type.clone(),
1521                        can_receive_direct: announcement.can_receive_direct,
1522                        is_relay: announcement.is_relay,
1523                        is_coordinator: announcement.is_coordinator,
1524                    },
1525                );
1526
1527                // Auto-connect to discovered agents so pub/sub messages can route
1528                // between peers that share bootstrap nodes but aren't directly connected.
1529                // The gossip topology refresh (every 1s) will add the new peer to
1530                // PlumTree topic trees once the QUIC connection is established.
1531                if announcement.agent_id != own_agent_id
1532                    && !announcement.addresses.is_empty()
1533                    && !auto_connect_attempted.contains(&announcement.agent_id)
1534                {
1535                    if let Some(ref net) = &network {
1536                        let ant_peer = ant_quic::PeerId(announcement.machine_id.0);
1537                        if !net.is_connected(&ant_peer).await {
1538                            auto_connect_attempted.insert(announcement.agent_id);
1539                            let net = std::sync::Arc::clone(net);
1540                            let addresses = announcement.addresses.clone();
1541                            tokio::spawn(async move {
1542                                for addr in &addresses {
1543                                    match net.connect_addr(*addr).await {
1544                                        Ok(_) => {
1545                                            tracing::info!(
1546                                                "Auto-connected to discovered agent at {addr}",
1547                                            );
1548                                            return;
1549                                        }
1550                                        Err(e) => {
1551                                            tracing::debug!("Auto-connect to {addr} failed: {e}",);
1552                                        }
1553                                    }
1554                                }
1555                                tracing::debug!(
1556                                    "Auto-connect exhausted all {} addresses for discovered agent",
1557                                    addresses.len(),
1558                                );
1559                            });
1560                        }
1561                    }
1562                }
1563            }
1564        });
1565
1566        Ok(())
1567    }
1568
1569    fn unix_timestamp_secs() -> u64 {
1570        std::time::SystemTime::now()
1571            .duration_since(std::time::UNIX_EPOCH)
1572            .map_or(0, |d| d.as_secs())
1573    }
1574
1575    fn announcement_addresses(&self) -> Vec<std::net::SocketAddr> {
1576        // Try routable_addr synchronously via local_addr fallback.
1577        // The async routable_addr is used in HeartbeatContext::announce().
1578        match self.network.as_ref().and_then(|n| n.local_addr()) {
1579            Some(addr) if addr.port() > 0 && !addr.ip().is_unspecified() => vec![addr],
1580            _ => Vec::new(),
1581        }
1582    }
1583
1584    fn build_identity_announcement(
1585        &self,
1586        include_user_identity: bool,
1587        human_consent: bool,
1588    ) -> error::Result<IdentityAnnouncement> {
1589        self.build_identity_announcement_with_addrs(
1590            include_user_identity,
1591            human_consent,
1592            self.announcement_addresses(),
1593        )
1594    }
1595
1596    fn build_identity_announcement_with_addrs(
1597        &self,
1598        include_user_identity: bool,
1599        human_consent: bool,
1600        addresses: Vec<std::net::SocketAddr>,
1601    ) -> error::Result<IdentityAnnouncement> {
1602        if include_user_identity && !human_consent {
1603            return Err(error::IdentityError::Storage(std::io::Error::other(
1604                "human identity disclosure requires explicit human consent — set human_consent: true in the request body",
1605            )));
1606        }
1607
1608        let (user_id, agent_certificate) = if include_user_identity {
1609            let user_id = self.user_id().ok_or_else(|| {
1610                error::IdentityError::Storage(std::io::Error::other(
1611                    "human identity disclosure requested but no user identity is configured — set user_key_path in your config.toml to point at your user keypair file",
1612                ))
1613            })?;
1614            let cert = self.agent_certificate().cloned().ok_or_else(|| {
1615                error::IdentityError::Storage(std::io::Error::other(
1616                    "human identity disclosure requested but agent certificate is missing",
1617                ))
1618            })?;
1619            (Some(user_id), Some(cert))
1620        } else {
1621            (None, None)
1622        };
1623
1624        let machine_public_key = self
1625            .identity
1626            .machine_keypair()
1627            .public_key()
1628            .as_bytes()
1629            .to_vec();
1630
1631        // NAT status is populated by the heartbeat loop (which is async and has
1632        // access to NodeStatus). Here we use None for the NAT fields as this
1633        // sync builder doesn't have async access to the network layer.
1634        let unsigned = IdentityAnnouncementUnsigned {
1635            agent_id: self.agent_id(),
1636            machine_id: self.machine_id(),
1637            user_id,
1638            agent_certificate: agent_certificate.clone(),
1639            machine_public_key: machine_public_key.clone(),
1640            addresses,
1641            announced_at: Self::unix_timestamp_secs(),
1642            nat_type: None,
1643            can_receive_direct: None,
1644            is_relay: None,
1645            is_coordinator: None,
1646        };
1647        let unsigned_bytes = bincode::serialize(&unsigned).map_err(|e| {
1648            error::IdentityError::Serialization(format!(
1649                "failed to serialize unsigned identity announcement: {e}"
1650            ))
1651        })?;
1652        let machine_signature = ant_quic::crypto::raw_public_keys::pqc::sign_with_ml_dsa(
1653            self.identity.machine_keypair().secret_key(),
1654            &unsigned_bytes,
1655        )
1656        .map_err(|e| {
1657            error::IdentityError::Storage(std::io::Error::other(format!(
1658                "failed to sign identity announcement with machine key: {:?}",
1659                e
1660            )))
1661        })?
1662        .as_bytes()
1663        .to_vec();
1664
1665        Ok(IdentityAnnouncement {
1666            agent_id: unsigned.agent_id,
1667            machine_id: unsigned.machine_id,
1668            user_id: unsigned.user_id,
1669            agent_certificate: unsigned.agent_certificate,
1670            machine_public_key,
1671            machine_signature,
1672            addresses: unsigned.addresses,
1673            announced_at: unsigned.announced_at,
1674            nat_type: unsigned.nat_type,
1675            can_receive_direct: unsigned.can_receive_direct,
1676            is_relay: unsigned.is_relay,
1677            is_coordinator: unsigned.is_coordinator,
1678        })
1679    }
1680
1681    /// Join the x0x gossip network.
1682    ///
1683    /// Connects to bootstrap peers in parallel with automatic retries.
1684    /// Failed connections are retried after a delay to allow stale
1685    /// connections on remote nodes to expire.
1686    ///
1687    /// If the agent was not configured with a network, this method
1688    /// succeeds gracefully (nothing to join).
1689    pub async fn join_network(&self) -> error::Result<()> {
1690        let Some(network) = self.network.as_ref() else {
1691            tracing::debug!("join_network called but no network configured");
1692            return Ok(());
1693        };
1694
1695        if let Some(ref runtime) = self.gossip_runtime {
1696            runtime.start().await.map_err(|e| {
1697                error::IdentityError::Storage(std::io::Error::other(format!(
1698                    "failed to start gossip runtime: {e}"
1699                )))
1700            })?;
1701            tracing::info!("Gossip runtime started");
1702        }
1703        self.start_identity_listener().await?;
1704        self.start_direct_listener();
1705
1706        let bootstrap_nodes = network.config().bootstrap_nodes.clone();
1707
1708        let min_connected = 3;
1709        let mut all_connected: Vec<std::net::SocketAddr> = Vec::new();
1710
1711        // Phase 0: Try quality-scored coordinator peers from bootstrap cache.
1712        // The bootstrap cache learns about coordinator-capable peers passively
1713        // through normal connections — no coordinator gossip topic needed.
1714        if let Some(ref cache) = self.bootstrap_cache {
1715            let coordinators = cache.select_coordinators(6).await;
1716            let coordinator_addrs: Vec<std::net::SocketAddr> = coordinators
1717                .iter()
1718                .flat_map(|peer| peer.addresses.clone())
1719                .collect();
1720
1721            if !coordinator_addrs.is_empty() {
1722                tracing::info!(
1723                    "Phase 0: Trying {} addresses from {} cached coordinators",
1724                    coordinator_addrs.len(),
1725                    coordinators.len()
1726                );
1727                let (succeeded, _failed) = self
1728                    .connect_peers_parallel_tracked(network, &coordinator_addrs)
1729                    .await;
1730                all_connected.extend(&succeeded);
1731                tracing::info!(
1732                    "Phase 0: {}/{} coordinator addresses connected",
1733                    succeeded.len(),
1734                    coordinator_addrs.len()
1735                );
1736            }
1737        }
1738
1739        // Phase 1: Try cached peers first using the real ant-quic peer IDs.
1740        if all_connected.len() < min_connected {
1741            if let Some(ref cache) = self.bootstrap_cache {
1742                const PHASE1_PEER_CANDIDATES: usize = 12;
1743                let cached_peers = cache.select_peers(PHASE1_PEER_CANDIDATES).await;
1744                if !cached_peers.is_empty() {
1745                    tracing::info!("Phase 1: Trying {} cached peers", cached_peers.len());
1746                    let (succeeded, _failed) = self
1747                        .connect_cached_peers_parallel_tracked(network, &cached_peers)
1748                        .await;
1749                    all_connected.extend(&succeeded);
1750                    tracing::info!(
1751                        "Phase 1: {}/{} cached peers connected",
1752                        succeeded.len(),
1753                        cached_peers.len()
1754                    );
1755                }
1756            }
1757        } // end Phase 1 min_connected check
1758
1759        // Phase 2: Connect to hardcoded bootstrap nodes if we need more peers.
1760        // This is the fallback for when coordinator cache and cached peers aren't enough.
1761        if all_connected.len() < min_connected && !bootstrap_nodes.is_empty() {
1762            let remaining: Vec<std::net::SocketAddr> = bootstrap_nodes
1763                .iter()
1764                .filter(|addr| !all_connected.contains(addr))
1765                .copied()
1766                .collect();
1767
1768            // Round 1: Connect to all bootstrap peers in parallel
1769            let (succeeded, mut failed) = self
1770                .connect_peers_parallel_tracked(network, &remaining)
1771                .await;
1772            all_connected.extend(&succeeded);
1773            tracing::info!(
1774                "Phase 2 round 1: {}/{} bootstrap peers connected",
1775                succeeded.len(),
1776                remaining.len()
1777            );
1778
1779            // Retry rounds for failed peers
1780            for round in 2..=3 {
1781                if failed.is_empty() {
1782                    break;
1783                }
1784                let delay = std::time::Duration::from_secs(if round == 2 { 10 } else { 15 });
1785                tracing::info!(
1786                    "Retrying {} failed peers in {}s (round {})",
1787                    failed.len(),
1788                    delay.as_secs(),
1789                    round
1790                );
1791                tokio::time::sleep(delay).await;
1792
1793                let (succeeded, still_failed) =
1794                    self.connect_peers_parallel_tracked(network, &failed).await;
1795                all_connected.extend(&succeeded);
1796                failed = still_failed;
1797                tracing::info!(
1798                    "Phase 2 round {}: {} total peers connected",
1799                    round,
1800                    all_connected.len()
1801                );
1802            }
1803
1804            if !failed.is_empty() {
1805                tracing::warn!(
1806                    "Could not connect to {} bootstrap peers: {:?}",
1807                    failed.len(),
1808                    failed
1809                );
1810            }
1811        }
1812
1813        tracing::info!(
1814            "Network join complete. Connected to {} peers.",
1815            all_connected.len()
1816        );
1817
1818        // Join the HyParView membership overlay via connected peers.
1819        if let Some(ref runtime) = self.gossip_runtime {
1820            let seeds: Vec<String> = all_connected.iter().map(|addr| addr.to_string()).collect();
1821            if !seeds.is_empty() {
1822                if let Err(e) = runtime.membership().join(seeds).await {
1823                    tracing::warn!("HyParView membership join failed: {e}");
1824                }
1825            }
1826        }
1827
1828        // Start presence beacons after membership overlay is established.
1829        if let Some(ref pw) = self.presence {
1830            // Seed broadcast peers from connected peers so beacons propagate.
1831            if let Some(ref runtime) = self.gossip_runtime {
1832                let active = runtime.membership().active_view();
1833                for peer in active {
1834                    pw.manager().add_broadcast_peer(peer).await;
1835                }
1836                tracing::info!(
1837                    "Presence seeded with {} broadcast peers",
1838                    pw.manager().broadcast_peer_count().await
1839                );
1840            }
1841
1842            // Populate address hints from network status for beacon metadata.
1843            if let Some(ref net) = self.network {
1844                if let Some(status) = net.node_status().await {
1845                    let mut hints: Vec<String> = status
1846                        .external_addrs
1847                        .iter()
1848                        .map(|a| a.to_string())
1849                        .collect();
1850                    hints.push(status.local_addr.to_string());
1851                    pw.manager().set_addr_hints(hints).await;
1852                }
1853            }
1854
1855            if pw.config().enable_beacons {
1856                if let Err(e) = pw
1857                    .manager()
1858                    .start_beacons(pw.config().beacon_interval_secs)
1859                    .await
1860                {
1861                    tracing::warn!("Failed to start presence beacons: {e}");
1862                } else {
1863                    tracing::info!(
1864                        "Presence beacons started (interval={}s)",
1865                        pw.config().beacon_interval_secs
1866                    );
1867                }
1868            }
1869
1870            // Start the presence event-emission loop so that subscribers
1871            // automatically receive AgentOnline/AgentOffline events after
1872            // join_network() returns.
1873            pw.start_event_loop(std::sync::Arc::clone(&self.identity_discovery_cache))
1874                .await;
1875            tracing::debug!("Presence event loop started");
1876        }
1877
1878        if let Err(e) = self.announce_identity(false, false).await {
1879            tracing::warn!("Initial identity announcement failed: {}", e);
1880        }
1881        if let Err(e) = self.start_identity_heartbeat().await {
1882            tracing::warn!("Failed to start identity heartbeat: {e}");
1883        }
1884
1885        Ok(())
1886    }
1887
1888    /// Connect to cached peers in parallel, returning (succeeded, failed) peer lists.
1889    async fn connect_cached_peers_parallel_tracked(
1890        &self,
1891        network: &std::sync::Arc<network::NetworkNode>,
1892        peers: &[ant_quic::CachedPeer],
1893    ) -> (Vec<std::net::SocketAddr>, Vec<ant_quic::PeerId>) {
1894        let handles: Vec<_> = peers
1895            .iter()
1896            .map(|peer| {
1897                let net = network.clone();
1898                let peer_id = peer.peer_id;
1899                tokio::spawn(async move {
1900                    tracing::debug!("Connecting to cached peer: {:?}", peer_id);
1901                    match net.connect_cached_peer(peer_id).await {
1902                        Ok(addr) => {
1903                            tracing::info!("Connected to cached peer {:?} at {}", peer_id, addr);
1904                            Ok(addr)
1905                        }
1906                        Err(e) => {
1907                            tracing::warn!("Failed to connect to cached peer {:?}: {}", peer_id, e);
1908                            Err(peer_id)
1909                        }
1910                    }
1911                })
1912            })
1913            .collect();
1914
1915        let mut succeeded = Vec::new();
1916        let mut failed = Vec::new();
1917        for handle in handles {
1918            match handle.await {
1919                Ok(Ok(addr)) => succeeded.push(addr),
1920                Ok(Err(peer_id)) => failed.push(peer_id),
1921                Err(e) => tracing::error!("Connection task panicked: {}", e),
1922            }
1923        }
1924        (succeeded, failed)
1925    }
1926
1927    /// Connect to multiple peers in parallel, returning (succeeded, failed) address lists.
1928    async fn connect_peers_parallel_tracked(
1929        &self,
1930        network: &std::sync::Arc<network::NetworkNode>,
1931        addrs: &[std::net::SocketAddr],
1932    ) -> (Vec<std::net::SocketAddr>, Vec<std::net::SocketAddr>) {
1933        let handles: Vec<_> = addrs
1934            .iter()
1935            .map(|addr| {
1936                let net = network.clone();
1937                let addr = *addr;
1938                tokio::spawn(async move {
1939                    tracing::debug!("Connecting to peer: {}", addr);
1940                    match net.connect_addr(addr).await {
1941                        Ok(_) => {
1942                            tracing::info!("Connected to peer: {}", addr);
1943                            Ok(addr)
1944                        }
1945                        Err(e) => {
1946                            tracing::warn!("Failed to connect to {}: {}", addr, e);
1947                            Err(addr)
1948                        }
1949                    }
1950                })
1951            })
1952            .collect();
1953
1954        let mut succeeded = Vec::new();
1955        let mut failed = Vec::new();
1956        for handle in handles {
1957            match handle.await {
1958                Ok(Ok(addr)) => succeeded.push(addr),
1959                Ok(Err(addr)) => failed.push(addr),
1960                Err(e) => tracing::error!("Connection task panicked: {}", e),
1961            }
1962        }
1963        (succeeded, failed)
1964    }
1965
1966    /// Subscribe to messages on a given topic.
1967    ///
1968    /// Returns a [`gossip::Subscription`] that yields messages as they arrive
1969    /// through the gossip network.
1970    ///
1971    /// # Errors
1972    ///
1973    /// Returns an error if:
1974    /// - Gossip runtime is not initialized (configure agent with network first)
1975    pub async fn subscribe(&self, topic: &str) -> error::Result<Subscription> {
1976        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1977            error::IdentityError::Storage(std::io::Error::other(
1978                "gossip runtime not initialized - configure agent with network first",
1979            ))
1980        })?;
1981        Ok(runtime.pubsub().subscribe(topic.to_string()).await)
1982    }
1983
1984    /// Publish a message to a topic.
1985    ///
1986    /// The message will propagate through the gossip network via
1987    /// epidemic broadcast — every agent that receives it will
1988    /// relay it to its neighbours.
1989    ///
1990    /// # Errors
1991    ///
1992    /// Returns an error if:
1993    /// - Gossip runtime is not initialized (configure agent with network first)
1994    /// - Message encoding or broadcast fails
1995    pub async fn publish(&self, topic: &str, payload: Vec<u8>) -> error::Result<()> {
1996        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1997            error::IdentityError::Storage(std::io::Error::other(
1998                "gossip runtime not initialized - configure agent with network first",
1999            ))
2000        })?;
2001        runtime
2002            .pubsub()
2003            .publish(topic.to_string(), bytes::Bytes::from(payload))
2004            .await
2005            .map_err(|e| {
2006                error::IdentityError::Storage(std::io::Error::other(format!(
2007                    "publish failed: {}",
2008                    e
2009                )))
2010            })
2011    }
2012
2013    /// Get connected peer IDs.
2014    ///
2015    /// Returns the list of peers currently connected via the gossip network.
2016    ///
2017    /// # Errors
2018    ///
2019    /// Returns an error if the network is not initialized.
2020    pub async fn peers(&self) -> error::Result<Vec<saorsa_gossip_types::PeerId>> {
2021        let network = self.network.as_ref().ok_or_else(|| {
2022            error::IdentityError::Storage(std::io::Error::other(
2023                "network not initialized - configure agent with network first",
2024            ))
2025        })?;
2026        let ant_peers = network.connected_peers().await;
2027        Ok(ant_peers
2028            .into_iter()
2029            .map(|p| saorsa_gossip_types::PeerId::new(p.0))
2030            .collect())
2031    }
2032
2033    /// Get online agents.
2034    ///
2035    /// Returns agent IDs discovered from signed identity announcements.
2036    ///
2037    /// # Errors
2038    ///
2039    /// Returns an error if the gossip runtime is not initialized.
2040    pub async fn presence(&self) -> error::Result<Vec<identity::AgentId>> {
2041        self.start_identity_listener().await?;
2042        let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
2043        let mut agents: Vec<_> = self
2044            .identity_discovery_cache
2045            .read()
2046            .await
2047            .values()
2048            .filter(|a| a.announced_at >= cutoff)
2049            .map(|a| a.agent_id)
2050            .collect();
2051        agents.sort_by(|a, b| a.0.cmp(&b.0));
2052        Ok(agents)
2053    }
2054
2055    /// Subscribe to presence events (agent online/offline notifications).
2056    ///
2057    /// Returns a [`tokio::sync::broadcast::Receiver<PresenceEvent>`] that yields
2058    /// [`presence::PresenceEvent`] values as agents come online or go offline.
2059    ///
2060    /// The diff-based event emission loop is started lazily on the first call to this
2061    /// method (or when [`join_network`](Agent::join_network) is called). Subsequent
2062    /// calls return independent receivers on the same broadcast channel.
2063    ///
2064    /// # Errors
2065    ///
2066    /// Returns [`error::NetworkError::NodeError`] if this agent was built
2067    /// without a network configuration (i.e. no `with_network_config` on the builder).
2068    pub async fn subscribe_presence(
2069        &self,
2070    ) -> error::NetworkResult<tokio::sync::broadcast::Receiver<presence::PresenceEvent>> {
2071        let pw = self.presence.as_ref().ok_or_else(|| {
2072            error::NetworkError::NodeError("presence system not initialized".to_string())
2073        })?;
2074        // Ensure the event loop is running.
2075        pw.start_event_loop(std::sync::Arc::clone(&self.identity_discovery_cache))
2076            .await;
2077        Ok(pw.subscribe_events())
2078    }
2079
2080    /// Look up a single agent in the local discovery cache.
2081    ///
2082    /// Returns `None` if the agent is not currently cached.  No network I/O is
2083    /// performed — use [`discover_agent_by_id`](Agent::discover_agent_by_id) for
2084    /// an active lookup that queries the network.
2085    pub async fn cached_agent(&self, id: &identity::AgentId) -> Option<DiscoveredAgent> {
2086        self.identity_discovery_cache.read().await.get(id).cloned()
2087    }
2088
2089    /// Discover agents via Friend-of-a-Friend (FOAF) random walk.
2090    ///
2091    /// Initiates a FOAF query on the global presence topic with the given `ttl`
2092    /// (maximum hop count) and `timeout_ms` (response collection window).
2093    ///
2094    /// Returned entries are resolved against the local identity discovery cache
2095    /// so that known agents are returned with full identity data.  Unknown peers
2096    /// are included with a minimal entry (addresses only) that will be enriched
2097    /// once their identity heartbeat arrives.
2098    ///
2099    /// # Arguments
2100    ///
2101    /// * `ttl` — Maximum hop count for the random walk (`1`–`5`). Typical: `2`.
2102    /// * `timeout_ms` — Query timeout in milliseconds. Typical: `5000`.
2103    ///
2104    /// # Errors
2105    ///
2106    /// Returns [`error::NetworkError::NodeError`] if no network config was provided.
2107    pub async fn discover_agents_foaf(
2108        &self,
2109        ttl: u8,
2110        timeout_ms: u64,
2111    ) -> error::NetworkResult<Vec<DiscoveredAgent>> {
2112        let pw = self.presence.as_ref().ok_or_else(|| {
2113            error::NetworkError::NodeError("presence system not initialized".to_string())
2114        })?;
2115
2116        let topic = presence::global_presence_topic();
2117        let raw_results: Vec<(
2118            saorsa_gossip_types::PeerId,
2119            saorsa_gossip_types::PresenceRecord,
2120        )> = pw
2121            .manager()
2122            .initiate_foaf_query(topic, ttl, timeout_ms)
2123            .await
2124            .map_err(|e| error::NetworkError::NodeError(e.to_string()))?;
2125
2126        let cache = self.identity_discovery_cache.read().await;
2127
2128        // Convert and deduplicate by agent_id.
2129        let mut seen: std::collections::HashSet<identity::AgentId> =
2130            std::collections::HashSet::new();
2131        let mut agents: Vec<DiscoveredAgent> = Vec::with_capacity(raw_results.len());
2132
2133        for (peer_id, record) in &raw_results {
2134            if let Some(agent) =
2135                presence::presence_record_to_discovered_agent(*peer_id, record, &cache)
2136            {
2137                if seen.insert(agent.agent_id) {
2138                    agents.push(agent);
2139                }
2140            }
2141        }
2142
2143        Ok(agents)
2144    }
2145
2146    /// Discover a specific agent by their [`identity::AgentId`] via FOAF random walk.
2147    ///
2148    /// Fast-path: checks the local identity discovery cache first and returns
2149    /// immediately if the agent is already known.
2150    ///
2151    /// Slow-path: performs a FOAF random walk (see [`discover_agents_foaf`](Agent::discover_agents_foaf))
2152    /// and searches the results for a matching `AgentId`.
2153    ///
2154    /// Returns `None` if the agent is not found within the given `ttl` and `timeout_ms`.
2155    ///
2156    /// # Errors
2157    ///
2158    /// Returns [`error::NetworkError::NodeCreation`] if no network config was provided.
2159    pub async fn discover_agent_by_id(
2160        &self,
2161        target_id: identity::AgentId,
2162        ttl: u8,
2163        timeout_ms: u64,
2164    ) -> error::NetworkResult<Option<DiscoveredAgent>> {
2165        // Fast path: already in local cache.
2166        {
2167            let cache = self.identity_discovery_cache.read().await;
2168            if let Some(agent) = cache.get(&target_id) {
2169                return Ok(Some(agent.clone()));
2170            }
2171        }
2172
2173        // Slow path: FOAF random walk.
2174        let agents = self.discover_agents_foaf(ttl, timeout_ms).await?;
2175        Ok(agents.into_iter().find(|a| a.agent_id == target_id))
2176    }
2177
2178    /// Find an agent by ID, returning its known addresses.
2179    ///
2180    /// Performs a three-stage lookup:
2181    /// 1. **Cache hit** — return addresses immediately if the agent has already
2182    ///    been discovered.
2183    /// 2. **Shard subscription** — subscribe to the agent's identity shard topic
2184    ///    and wait up to 5 seconds for a heartbeat announcement.
2185    /// 3. **Rendezvous** — subscribe to the agent's rendezvous shard topic and
2186    ///    wait up to 5 seconds for a `ProviderSummary` advertisement.  This
2187    ///    works even when the two agents are on different gossip overlay clusters.
2188    ///
2189    /// Returns `None` if the agent is not found within the combined deadline.
2190    ///
2191    /// # Errors
2192    ///
2193    /// Returns an error if the gossip runtime is not initialized.
2194    pub async fn find_agent(
2195        &self,
2196        agent_id: identity::AgentId,
2197    ) -> error::Result<Option<Vec<std::net::SocketAddr>>> {
2198        self.start_identity_listener().await?;
2199
2200        // Stage 1: cache hit.
2201        if let Some(addrs) = self
2202            .identity_discovery_cache
2203            .read()
2204            .await
2205            .get(&agent_id)
2206            .map(|e| e.addresses.clone())
2207        {
2208            return Ok(Some(addrs));
2209        }
2210
2211        // Stage 2: subscribe to the agent's identity shard topic and wait up to 5 s.
2212        let runtime = match self.gossip_runtime.as_ref() {
2213            Some(r) => r,
2214            None => return Ok(None),
2215        };
2216        let shard_topic = shard_topic_for_agent(&agent_id);
2217        let mut sub = runtime.pubsub().subscribe(shard_topic).await;
2218        let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
2219        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
2220
2221        loop {
2222            if tokio::time::Instant::now() >= deadline {
2223                break;
2224            }
2225            let timeout = tokio::time::sleep_until(deadline);
2226            tokio::select! {
2227                Some(msg) = sub.recv() => {
2228                    if let Ok(ann) = {
2229                        use bincode::Options;
2230                        bincode::DefaultOptions::new()
2231                            .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
2232                            .deserialize::<IdentityAnnouncement>(&msg.payload)
2233                    } {
2234                        if ann.verify().is_ok() && ann.agent_id == agent_id {
2235                            let now = std::time::SystemTime::now()
2236                                .duration_since(std::time::UNIX_EPOCH)
2237                                .map_or(0, |d| d.as_secs());
2238                            let addrs = ann.addresses.clone();
2239                            cache.write().await.insert(
2240                                ann.agent_id,
2241                                DiscoveredAgent {
2242                                    agent_id: ann.agent_id,
2243                                    machine_id: ann.machine_id,
2244                                    user_id: ann.user_id,
2245                                    addresses: ann.addresses,
2246                                    announced_at: ann.announced_at,
2247                                    last_seen: now,
2248                                    machine_public_key: ann.machine_public_key.clone(),
2249                                    nat_type: ann.nat_type.clone(),
2250                                    can_receive_direct: ann.can_receive_direct,
2251                                    is_relay: ann.is_relay,
2252                                    is_coordinator: ann.is_coordinator,
2253                                },
2254                            );
2255                            return Ok(Some(addrs));
2256                        }
2257                    }
2258                }
2259                _ = timeout => break,
2260            }
2261        }
2262
2263        // Stage 3: rendezvous shard subscription — wait up to 5 s.
2264        // Cache the result so subsequent connect_to_agent / send_direct can find it.
2265        if let Some(addrs) = self.find_agent_rendezvous(agent_id, 5).await? {
2266            let now = std::time::SystemTime::now()
2267                .duration_since(std::time::UNIX_EPOCH)
2268                .map_or(0, |d| d.as_secs());
2269            cache
2270                .write()
2271                .await
2272                .entry(agent_id)
2273                .or_insert_with(|| DiscoveredAgent {
2274                    agent_id,
2275                    machine_id: identity::MachineId([0u8; 32]),
2276                    user_id: None,
2277                    addresses: addrs.clone(),
2278                    announced_at: now,
2279                    last_seen: now,
2280                    machine_public_key: Vec::new(),
2281                    nat_type: None,
2282                    can_receive_direct: None,
2283                    is_relay: None,
2284                    is_coordinator: None,
2285                });
2286            return Ok(Some(addrs));
2287        }
2288
2289        Ok(None)
2290    }
2291
2292    /// Find all discovered agents claiming ownership by the given [`identity::UserId`].
2293    ///
2294    /// Only returns agents that announced with `include_user_identity: true`
2295    /// (i.e., agents whose [`DiscoveredAgent::user_id`] is `Some`).
2296    ///
2297    /// # Arguments
2298    ///
2299    /// * `user_id` - The user identity to search for
2300    ///
2301    /// # Errors
2302    ///
2303    /// Returns an error if the gossip runtime is not initialized.
2304    pub async fn find_agents_by_user(
2305        &self,
2306        user_id: identity::UserId,
2307    ) -> error::Result<Vec<DiscoveredAgent>> {
2308        self.start_identity_listener().await?;
2309        let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
2310        Ok(self
2311            .identity_discovery_cache
2312            .read()
2313            .await
2314            .values()
2315            .filter(|a| a.announced_at >= cutoff && a.user_id == Some(user_id))
2316            .cloned()
2317            .collect())
2318    }
2319
2320    /// Return the local socket address this agent's network node is bound to, if any.
2321    ///
2322    /// Returns `None` if no network has been configured or if the bind address is
2323    /// not yet known.
2324    #[must_use]
2325    pub fn local_addr(&self) -> Option<std::net::SocketAddr> {
2326        self.network.as_ref().and_then(|n| n.local_addr())
2327    }
2328
2329    /// Build a signed [`IdentityAnnouncement`] for this agent.
2330    ///
2331    /// Delegates to the internal `build_identity_announcement` method.
2332    ///
2333    /// # Errors
2334    ///
2335    /// Returns an error if key signing fails or human consent is required but not given.
2336    pub fn build_announcement(
2337        &self,
2338        include_user: bool,
2339        consent: bool,
2340    ) -> error::Result<IdentityAnnouncement> {
2341        self.build_identity_announcement(include_user, consent)
2342    }
2343
2344    /// Start the background identity heartbeat task.
2345    ///
2346    /// Idempotent — if the heartbeat is already running, returns `Ok(())` immediately.
2347    /// The heartbeat re-announces this agent's identity at `heartbeat_interval_secs`
2348    /// intervals so that late-joining peers can discover it without waiting for a
2349    /// Start the direct message listener background task.
2350    ///
2351    /// This task reads raw direct messages from the network layer and
2352    /// dispatches them to `DirectMessaging::handle_incoming()`, which
2353    /// broadcasts to all `subscribe_direct()` receivers.
2354    ///
2355    /// Called automatically by [`Agent::join_network`].
2356    fn start_direct_listener(&self) {
2357        if self
2358            .direct_listener_started
2359            .swap(true, std::sync::atomic::Ordering::AcqRel)
2360        {
2361            return;
2362        }
2363
2364        let Some(network) = self.network.as_ref().map(std::sync::Arc::clone) else {
2365            return;
2366        };
2367        let dm = std::sync::Arc::clone(&self.direct_messaging);
2368
2369        tokio::spawn(async move {
2370            tracing::info!("Direct message listener started");
2371            loop {
2372                let Some((ant_peer_id, payload)) = network.recv_direct().await else {
2373                    tracing::debug!("Direct message channel closed");
2374                    break;
2375                };
2376
2377                // Parse: first 32 bytes = sender AgentId, rest = payload
2378                if payload.len() < 32 {
2379                    tracing::warn!("Direct message too short ({} bytes)", payload.len());
2380                    continue;
2381                }
2382
2383                let mut sender_bytes = [0u8; 32];
2384                sender_bytes.copy_from_slice(&payload[..32]);
2385                let sender = identity::AgentId(sender_bytes);
2386                let machine_id = identity::MachineId(ant_peer_id.0);
2387                let data = payload[32..].to_vec();
2388
2389                // Register the agent→machine mapping for future lookups
2390                dm.register_agent(sender, machine_id).await;
2391
2392                // Broadcast to all subscribe_direct() receivers
2393                dm.handle_incoming(machine_id, sender, data).await;
2394            }
2395        });
2396    }
2397
2398    /// new announcement.
2399    ///
2400    /// Called automatically by [`Agent::join_network`].
2401    ///
2402    /// # Errors
2403    ///
2404    /// Returns an error if a required network or gossip component is missing.
2405    pub async fn start_identity_heartbeat(&self) -> error::Result<()> {
2406        let mut handle_guard = self.heartbeat_handle.lock().await;
2407        if handle_guard.is_some() {
2408            return Ok(());
2409        }
2410        let Some(runtime) = self.gossip_runtime.as_ref().map(std::sync::Arc::clone) else {
2411            return Err(error::IdentityError::Storage(std::io::Error::other(
2412                "gossip runtime not initialized — cannot start heartbeat",
2413            )));
2414        };
2415        let Some(network) = self.network.as_ref().map(std::sync::Arc::clone) else {
2416            return Err(error::IdentityError::Storage(std::io::Error::other(
2417                "network not initialized — cannot start heartbeat",
2418            )));
2419        };
2420        let ctx = HeartbeatContext {
2421            identity: std::sync::Arc::clone(&self.identity),
2422            runtime,
2423            network,
2424            interval_secs: self.heartbeat_interval_secs,
2425            cache: std::sync::Arc::clone(&self.identity_discovery_cache),
2426        };
2427        let handle = tokio::task::spawn(async move {
2428            let mut ticker =
2429                tokio::time::interval(std::time::Duration::from_secs(ctx.interval_secs));
2430            ticker.tick().await; // skip first immediate tick
2431            loop {
2432                ticker.tick().await;
2433                if let Err(e) = ctx.announce().await {
2434                    tracing::warn!("identity heartbeat announce failed: {e}");
2435                }
2436            }
2437        });
2438        *handle_guard = Some(handle);
2439        Ok(())
2440    }
2441
2442    /// Publish a rendezvous `ProviderSummary` for this agent.
2443    ///
2444    /// Enables global findability across gossip overlay partitions.  Seekers
2445    /// that have never been on the same partition as this agent can still
2446    /// discover it by subscribing to the rendezvous shard topic and waiting
2447    /// for the next heartbeat advertisement.
2448    ///
2449    /// The summary is signed with this agent's machine key and contains the
2450    /// agent's reachability addresses in the `extensions` field (bincode-encoded
2451    /// `Vec<SocketAddr>`).
2452    ///
2453    /// # Re-advertisement contract
2454    ///
2455    /// Rendezvous summaries expire after `validity_ms` milliseconds.  **Callers
2456    /// are responsible for calling `advertise_identity` again before expiry** so
2457    /// that seekers can always find a fresh record.  A common strategy is to
2458    /// re-advertise every `validity_ms / 2`.  The `x0xd` daemon does this
2459    /// automatically via its background re-advertisement task.
2460    ///
2461    /// # Arguments
2462    ///
2463    /// * `validity_ms` — How long (milliseconds) before the summary expires.
2464    ///   After this time, seekers will no longer discover this agent via rendezvous
2465    ///   unless a fresh `advertise_identity` call is made.
2466    ///
2467    /// # Errors
2468    ///
2469    /// Returns an error if the gossip runtime is not initialized, serialization
2470    /// fails, or signing fails.
2471    pub async fn advertise_identity(&self, validity_ms: u64) -> error::Result<()> {
2472        use saorsa_gossip_rendezvous::{Capability, ProviderSummary};
2473
2474        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2475            error::IdentityError::Storage(std::io::Error::other(
2476                "gossip runtime not initialized — cannot advertise identity",
2477            ))
2478        })?;
2479
2480        let peer_id = runtime.peer_id();
2481        let addresses = self.announcement_addresses();
2482        let addr_bytes = bincode::serialize(&addresses).map_err(|e| {
2483            error::IdentityError::Serialization(format!(
2484                "failed to serialize addresses for rendezvous: {e}"
2485            ))
2486        })?;
2487
2488        let mut summary = ProviderSummary::new(
2489            self.agent_id().0,
2490            peer_id,
2491            vec![Capability::Identity],
2492            validity_ms,
2493        )
2494        .with_extensions(addr_bytes);
2495
2496        summary
2497            .sign_raw(self.identity.machine_keypair().secret_key().as_bytes())
2498            .map_err(|e| {
2499                error::IdentityError::Storage(std::io::Error::other(format!(
2500                    "failed to sign rendezvous summary: {e}"
2501                )))
2502            })?;
2503
2504        let cbor_bytes = summary.to_cbor().map_err(|e| {
2505            error::IdentityError::Serialization(format!(
2506                "failed to CBOR-encode rendezvous summary: {e}"
2507            ))
2508        })?;
2509
2510        let topic = rendezvous_shard_topic_for_agent(&self.agent_id());
2511        runtime
2512            .pubsub()
2513            .publish(topic, bytes::Bytes::from(cbor_bytes))
2514            .await
2515            .map_err(|e| {
2516                error::IdentityError::Storage(std::io::Error::other(format!(
2517                    "failed to publish rendezvous summary: {e}"
2518                )))
2519            })?;
2520
2521        self.rendezvous_advertised
2522            .store(true, std::sync::atomic::Ordering::Relaxed);
2523        Ok(())
2524    }
2525
2526    /// Search for an agent via rendezvous shard subscription.
2527    ///
2528    /// Subscribes to the rendezvous shard topic for `agent_id` and waits up to
2529    /// `timeout_secs` for a matching [`saorsa_gossip_rendezvous::ProviderSummary`].
2530    /// On success the addresses encoded in the summary `extensions` field are
2531    /// returned.
2532    ///
2533    /// This is Stage 3 of [`Agent::find_agent`]'s lookup cascade.
2534    ///
2535    /// # Errors
2536    ///
2537    /// Returns an error if the gossip runtime is not initialized.
2538    pub async fn find_agent_rendezvous(
2539        &self,
2540        agent_id: identity::AgentId,
2541        timeout_secs: u64,
2542    ) -> error::Result<Option<Vec<std::net::SocketAddr>>> {
2543        use saorsa_gossip_rendezvous::ProviderSummary;
2544
2545        let runtime = match self.gossip_runtime.as_ref() {
2546            Some(r) => r,
2547            None => return Ok(None),
2548        };
2549
2550        let topic = rendezvous_shard_topic_for_agent(&agent_id);
2551        let mut sub = runtime.pubsub().subscribe(topic).await;
2552        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
2553
2554        loop {
2555            if tokio::time::Instant::now() >= deadline {
2556                break;
2557            }
2558            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
2559            tokio::select! {
2560                Some(msg) = sub.recv() => {
2561                    let summary = match ProviderSummary::from_cbor(&msg.payload) {
2562                        Ok(s) => s,
2563                        Err(_) => continue,
2564                    };
2565                    if summary.target != agent_id.0 {
2566                        continue;
2567                    }
2568                    // Verify the summary signature when the advertiser's machine
2569                    // public key is cached from a prior identity announcement.
2570                    // Without a cached key we still accept the addresses — they
2571                    // are connection hints only; the subsequent QUIC handshake will
2572                    // fail cryptographically if the endpoint is not the genuine agent.
2573                    let cached_pub = self
2574                        .identity_discovery_cache
2575                        .read()
2576                        .await
2577                        .get(&agent_id)
2578                        .map(|e| e.machine_public_key.clone());
2579                    if let Some(pub_bytes) = cached_pub {
2580                        if !pub_bytes.is_empty()
2581                            && !summary.verify_raw(&pub_bytes).unwrap_or(false)
2582                        {
2583                            tracing::warn!(
2584                                "Rendezvous summary signature verification failed for agent {:?}; discarding",
2585                                agent_id
2586                            );
2587                            continue;
2588                        }
2589                    }
2590                    // Decode addresses from the extensions field.
2591                    let addrs: Vec<std::net::SocketAddr> = summary
2592                        .extensions
2593                        .as_deref()
2594                        .and_then(|b| {
2595                            use bincode::Options;
2596                            bincode::DefaultOptions::new()
2597                                .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
2598                                .deserialize(b)
2599                                .ok()
2600                        })
2601                        .unwrap_or_default();
2602                    if !addrs.is_empty() {
2603                        return Ok(Some(addrs));
2604                    }
2605                }
2606                _ = tokio::time::sleep(remaining) => break,
2607            }
2608        }
2609
2610        Ok(None)
2611    }
2612
2613    /// Insert a discovered agent into the cache (for testing only).
2614    ///
2615    /// # Arguments
2616    ///
2617    /// * `agent` - The agent entry to insert.
2618    #[doc(hidden)]
2619    pub async fn insert_discovered_agent_for_testing(&self, agent: DiscoveredAgent) {
2620        self.identity_discovery_cache
2621            .write()
2622            .await
2623            .insert(agent.agent_id, agent);
2624    }
2625
2626    /// Create a new collaborative task list bound to a topic.
2627    ///
2628    /// Creates a new `TaskList` and binds it to the specified gossip topic
2629    /// for automatic synchronization with other agents on the same topic.
2630    ///
2631    /// # Arguments
2632    ///
2633    /// * `name` - Human-readable name for the task list
2634    /// * `topic` - Gossip topic for synchronization
2635    ///
2636    /// # Returns
2637    ///
2638    /// A `TaskListHandle` for interacting with the task list.
2639    ///
2640    /// # Errors
2641    ///
2642    /// Returns an error if the gossip runtime is not initialized.
2643    ///
2644    /// # Example
2645    ///
2646    /// ```ignore
2647    /// let list = agent.create_task_list("Sprint Planning", "team-sprint").await?;
2648    /// ```
2649    pub async fn create_task_list(&self, name: &str, topic: &str) -> error::Result<TaskListHandle> {
2650        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2651            error::IdentityError::Storage(std::io::Error::other(
2652                "gossip runtime not initialized - configure agent with network first",
2653            ))
2654        })?;
2655
2656        let peer_id = runtime.peer_id();
2657        let list_id = crdt::TaskListId::from_content(name, &self.agent_id(), 0);
2658        let task_list = crdt::TaskList::new(list_id, name.to_string(), peer_id);
2659
2660        let sync = crdt::TaskListSync::new(
2661            task_list,
2662            std::sync::Arc::clone(runtime.pubsub()),
2663            topic.to_string(),
2664            30,
2665        )
2666        .map_err(|e| {
2667            error::IdentityError::Storage(std::io::Error::other(format!(
2668                "task list sync creation failed: {}",
2669                e
2670            )))
2671        })?;
2672
2673        let sync = std::sync::Arc::new(sync);
2674        sync.start().await.map_err(|e| {
2675            error::IdentityError::Storage(std::io::Error::other(format!(
2676                "task list sync start failed: {}",
2677                e
2678            )))
2679        })?;
2680
2681        Ok(TaskListHandle {
2682            sync,
2683            agent_id: self.agent_id(),
2684            peer_id,
2685        })
2686    }
2687
2688    /// Join an existing task list by topic.
2689    ///
2690    /// Connects to a task list that was created by another agent on the
2691    /// specified topic. The local replica will sync with peers automatically.
2692    ///
2693    /// # Arguments
2694    ///
2695    /// * `topic` - Gossip topic for the task list
2696    ///
2697    /// # Returns
2698    ///
2699    /// A `TaskListHandle` for interacting with the task list.
2700    ///
2701    /// # Errors
2702    ///
2703    /// Returns an error if the gossip runtime is not initialized.
2704    ///
2705    /// # Example
2706    ///
2707    /// ```ignore
2708    /// let list = agent.join_task_list("team-sprint").await?;
2709    /// ```
2710    pub async fn join_task_list(&self, topic: &str) -> error::Result<TaskListHandle> {
2711        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2712            error::IdentityError::Storage(std::io::Error::other(
2713                "gossip runtime not initialized - configure agent with network first",
2714            ))
2715        })?;
2716
2717        let peer_id = runtime.peer_id();
2718        // Create empty task list; it will be populated via delta sync
2719        let list_id = crdt::TaskListId::from_content(topic, &self.agent_id(), 0);
2720        let task_list = crdt::TaskList::new(list_id, String::new(), peer_id);
2721
2722        let sync = crdt::TaskListSync::new(
2723            task_list,
2724            std::sync::Arc::clone(runtime.pubsub()),
2725            topic.to_string(),
2726            30,
2727        )
2728        .map_err(|e| {
2729            error::IdentityError::Storage(std::io::Error::other(format!(
2730                "task list sync creation failed: {}",
2731                e
2732            )))
2733        })?;
2734
2735        let sync = std::sync::Arc::new(sync);
2736        sync.start().await.map_err(|e| {
2737            error::IdentityError::Storage(std::io::Error::other(format!(
2738                "task list sync start failed: {}",
2739                e
2740            )))
2741        })?;
2742
2743        Ok(TaskListHandle {
2744            sync,
2745            agent_id: self.agent_id(),
2746            peer_id,
2747        })
2748    }
2749}
2750
2751impl AgentBuilder {
2752    /// Set a custom path for the machine keypair.
2753    ///
2754    /// If not set, the machine keypair is stored in `~/.x0x/machine.key`.
2755    ///
2756    /// # Arguments
2757    ///
2758    /// * `path` - The path to store the machine keypair.
2759    ///
2760    /// # Returns
2761    ///
2762    /// Self for chaining.
2763    pub fn with_machine_key<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2764        self.machine_key_path = Some(path.as_ref().to_path_buf());
2765        self
2766    }
2767
2768    /// Import an agent keypair.
2769    ///
2770    /// If not set, the agent keypair is loaded from storage (or generated fresh
2771    /// if no stored key exists).
2772    ///
2773    /// This enables running the same agent on multiple machines by importing
2774    /// the same agent keypair (but with different machine keypairs).
2775    ///
2776    /// Note: When an explicit keypair is provided via this method, it takes
2777    /// precedence over `with_agent_key_path()`.
2778    ///
2779    /// # Arguments
2780    ///
2781    /// * `keypair` - The agent keypair to import.
2782    ///
2783    /// # Returns
2784    ///
2785    /// Self for chaining.
2786    pub fn with_agent_key(mut self, keypair: identity::AgentKeypair) -> Self {
2787        self.agent_keypair = Some(keypair);
2788        self
2789    }
2790
2791    /// Set a custom path for the agent keypair.
2792    ///
2793    /// If not set, the agent keypair is stored in `~/.x0x/agent.key`.
2794    /// If no stored key is found at the path, a fresh one is generated and saved.
2795    ///
2796    /// This is ignored when `with_agent_key()` provides an explicit keypair.
2797    ///
2798    /// # Arguments
2799    ///
2800    /// * `path` - The path to store/load the agent keypair.
2801    ///
2802    /// # Returns
2803    ///
2804    /// Self for chaining.
2805    pub fn with_agent_key_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2806        self.agent_key_path = Some(path.as_ref().to_path_buf());
2807        self
2808    }
2809
2810    /// Set network configuration for P2P communication.
2811    ///
2812    /// If not set, default network configuration is used.
2813    ///
2814    /// # Arguments
2815    ///
2816    /// * `config` - The network configuration to use.
2817    ///
2818    /// # Returns
2819    ///
2820    /// Self for chaining.
2821    pub fn with_network_config(mut self, config: network::NetworkConfig) -> Self {
2822        self.network_config = Some(config);
2823        self
2824    }
2825
2826    /// Set the directory for the bootstrap peer cache.
2827    ///
2828    /// The cache persists peer quality metrics across restarts, enabling
2829    /// cache-first join strategy. Defaults to `~/.x0x/peers/` if not set.
2830    /// Falls back to `./.x0x/peers/` (relative to CWD) if `$HOME` is unset.
2831    pub fn with_peer_cache_dir<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2832        self.peer_cache_dir = Some(path.as_ref().to_path_buf());
2833        self
2834    }
2835
2836    /// Import a user keypair for three-layer identity.
2837    ///
2838    /// This binds a human identity to this agent. When provided, an
2839    /// [`identity::AgentCertificate`] is automatically issued (if one
2840    /// doesn't already exist in storage) to cryptographically attest
2841    /// that this agent belongs to the user.
2842    ///
2843    /// Note: When an explicit keypair is provided via this method, it takes
2844    /// precedence over `with_user_key_path()`.
2845    ///
2846    /// # Arguments
2847    ///
2848    /// * `keypair` - The user keypair to import.
2849    ///
2850    /// # Returns
2851    ///
2852    /// Self for chaining.
2853    pub fn with_user_key(mut self, keypair: identity::UserKeypair) -> Self {
2854        self.user_keypair = Some(keypair);
2855        self
2856    }
2857
2858    /// Set a custom path for the user keypair.
2859    ///
2860    /// Unlike machine and agent keys, user keys are **not** auto-generated.
2861    /// If the file at this path doesn't exist, no user identity is set
2862    /// (the agent operates with two-layer identity).
2863    ///
2864    /// This is ignored when `with_user_key()` provides an explicit keypair.
2865    ///
2866    /// # Arguments
2867    ///
2868    /// * `path` - The path to load the user keypair from.
2869    ///
2870    /// # Returns
2871    ///
2872    /// Self for chaining.
2873    pub fn with_user_key_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2874        self.user_key_path = Some(path.as_ref().to_path_buf());
2875        self
2876    }
2877
2878    /// Set the identity heartbeat re-announcement interval.
2879    ///
2880    /// Defaults to [`IDENTITY_HEARTBEAT_INTERVAL_SECS`] (300 seconds).
2881    ///
2882    /// # Arguments
2883    ///
2884    /// * `secs` - Interval in seconds between identity re-announcements.
2885    #[must_use]
2886    pub fn with_heartbeat_interval(mut self, secs: u64) -> Self {
2887        self.heartbeat_interval_secs = Some(secs);
2888        self
2889    }
2890
2891    /// Set the identity cache TTL.
2892    ///
2893    /// Cache entries with `last_seen` older than this threshold are filtered
2894    /// from [`Agent::presence`] and [`Agent::discovered_agents`].
2895    ///
2896    /// Defaults to [`IDENTITY_TTL_SECS`] (900 seconds).
2897    ///
2898    /// # Arguments
2899    ///
2900    /// * `secs` - Time-to-live in seconds for discovered agent entries.
2901    #[must_use]
2902    pub fn with_identity_ttl(mut self, secs: u64) -> Self {
2903        self.identity_ttl_secs = Some(secs);
2904        self
2905    }
2906
2907    /// Set a custom path for the contacts file.
2908    ///
2909    /// The contacts file persists trust levels and machine records for known
2910    /// agents. Defaults to `~/.x0x/contacts.json` if not set.
2911    ///
2912    /// # Arguments
2913    ///
2914    /// * `path` - The path for the contacts file.
2915    #[must_use]
2916    pub fn with_contact_store_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2917        self.contact_store_path = Some(path.as_ref().to_path_buf());
2918        self
2919    }
2920
2921    /// Build and initialise the agent.
2922    ///
2923    /// This performs the following:
2924    /// 1. Loads or generates the machine keypair (stored in `~/.x0x/machine.key` by default)
2925    /// 2. Uses provided agent keypair or generates a fresh one
2926    /// 3. Combines both into a unified Identity
2927    ///
2928    /// The machine keypair is automatically persisted to storage.
2929    ///
2930    /// # Errors
2931    ///
2932    /// Returns an error if:
2933    /// - Machine keypair generation fails
2934    /// - Storage I/O fails
2935    /// - Keypair deserialization fails
2936    pub async fn build(self) -> error::Result<Agent> {
2937        // Determine machine keypair source
2938        let machine_keypair = if let Some(path) = self.machine_key_path {
2939            // Try to load from custom path
2940            match storage::load_machine_keypair_from(&path).await {
2941                Ok(kp) => kp,
2942                Err(_) => {
2943                    // Generate fresh keypair and save to custom path
2944                    let kp = identity::MachineKeypair::generate()?;
2945                    storage::save_machine_keypair_to(&kp, &path).await?;
2946                    kp
2947                }
2948            }
2949        } else if storage::machine_keypair_exists().await {
2950            // Load default machine keypair
2951            storage::load_machine_keypair().await?
2952        } else {
2953            // Generate and save default machine keypair
2954            let kp = identity::MachineKeypair::generate()?;
2955            storage::save_machine_keypair(&kp).await?;
2956            kp
2957        };
2958
2959        // Resolve agent keypair: explicit > path-based > default storage > generate
2960        let agent_keypair = if let Some(kp) = self.agent_keypair {
2961            // Explicit keypair takes highest precedence
2962            kp
2963        } else if let Some(path) = self.agent_key_path {
2964            // Custom path: load or generate+save
2965            match storage::load_agent_keypair_from(&path).await {
2966                Ok(kp) => kp,
2967                Err(_) => {
2968                    let kp = identity::AgentKeypair::generate()?;
2969                    storage::save_agent_keypair_to(&kp, &path).await?;
2970                    kp
2971                }
2972            }
2973        } else if storage::agent_keypair_exists().await {
2974            // Default path exists: load it
2975            storage::load_agent_keypair_default().await?
2976        } else {
2977            // No stored key: generate and persist
2978            let kp = identity::AgentKeypair::generate()?;
2979            storage::save_agent_keypair_default(&kp).await?;
2980            kp
2981        };
2982
2983        // Resolve user keypair: explicit > path-based > default storage > None (opt-in)
2984        let user_keypair = if let Some(kp) = self.user_keypair {
2985            Some(kp)
2986        } else if let Some(path) = self.user_key_path {
2987            // Custom path: load if exists, otherwise None (don't auto-generate)
2988            storage::load_user_keypair_from(&path).await.ok()
2989        } else if storage::user_keypair_exists().await {
2990            // Default path exists: load it
2991            storage::load_user_keypair().await.ok()
2992        } else {
2993            None
2994        };
2995
2996        // Build identity with optional user layer
2997        let identity = if let Some(user_kp) = user_keypair {
2998            // Try to load existing certificate, or issue a new one
2999            // IMPORTANT: Verify the cert matches the current user key
3000            let cert = if storage::agent_certificate_exists().await {
3001                match storage::load_agent_certificate().await {
3002                    Ok(c) => {
3003                        // Verify cert is for the current user - if not, re-issue
3004                        let cert_matches_user = c
3005                            .user_id()
3006                            .map(|uid| uid == user_kp.user_id())
3007                            .unwrap_or(false);
3008                        if cert_matches_user {
3009                            c
3010                        } else {
3011                            // Cert was for a different user, issue new one
3012                            let new_cert =
3013                                identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
3014                            storage::save_agent_certificate(&new_cert).await?;
3015                            new_cert
3016                        }
3017                    }
3018                    Err(_) => {
3019                        let c = identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
3020                        storage::save_agent_certificate(&c).await?;
3021                        c
3022                    }
3023                }
3024            } else {
3025                let c = identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
3026                storage::save_agent_certificate(&c).await?;
3027                c
3028            };
3029            identity::Identity::new_with_user(machine_keypair, agent_keypair, user_kp, cert)
3030        } else {
3031            identity::Identity::new(machine_keypair, agent_keypair)
3032        };
3033
3034        // Open bootstrap peer cache if network will be configured
3035        let bootstrap_cache = if self.network_config.is_some() {
3036            let cache_dir = self.peer_cache_dir.unwrap_or_else(|| {
3037                dirs::home_dir()
3038                    .unwrap_or_else(|| std::path::PathBuf::from("."))
3039                    .join(".x0x")
3040                    .join("peers")
3041            });
3042            let config = ant_quic::BootstrapCacheConfig::builder()
3043                .cache_dir(cache_dir)
3044                .min_peers_to_save(1)
3045                .build();
3046            match ant_quic::BootstrapCache::open(config).await {
3047                Ok(cache) => {
3048                    let cache = std::sync::Arc::new(cache);
3049                    std::sync::Arc::clone(&cache).start_maintenance();
3050                    Some(cache)
3051                }
3052                Err(e) => {
3053                    tracing::warn!("Failed to open bootstrap cache: {e}");
3054                    None
3055                }
3056            }
3057        } else {
3058            None
3059        };
3060
3061        // Create network node if configured
3062        // Pass the machine keypair so ant-quic PeerId == MachineId (identity unification)
3063        let machine_keypair = {
3064            let pk = ant_quic::MlDsaPublicKey::from_bytes(
3065                identity.machine_keypair().public_key().as_bytes(),
3066            )
3067            .map_err(|e| {
3068                error::IdentityError::Storage(std::io::Error::other(format!(
3069                    "invalid machine public key: {e}"
3070                )))
3071            })?;
3072            let sk = ant_quic::MlDsaSecretKey::from_bytes(
3073                identity.machine_keypair().secret_key().as_bytes(),
3074            )
3075            .map_err(|e| {
3076                error::IdentityError::Storage(std::io::Error::other(format!(
3077                    "invalid machine secret key: {e}"
3078                )))
3079            })?;
3080            Some((pk, sk))
3081        };
3082
3083        let network = if let Some(config) = self.network_config {
3084            let node = network::NetworkNode::new(config, bootstrap_cache.clone(), machine_keypair)
3085                .await
3086                .map_err(|e| {
3087                    error::IdentityError::Storage(std::io::Error::other(format!(
3088                        "network initialization failed: {}",
3089                        e
3090                    )))
3091                })?;
3092
3093            // Verify identity unification: ant-quic PeerId must equal MachineId
3094            debug_assert_eq!(
3095                node.peer_id().0,
3096                identity.machine_id().0,
3097                "ant-quic PeerId must equal MachineId after identity unification"
3098            );
3099
3100            Some(std::sync::Arc::new(node))
3101        } else {
3102            None
3103        };
3104
3105        // Create signing context from agent keypair for message authentication
3106        let signing_ctx = std::sync::Arc::new(gossip::SigningContext::from_keypair(
3107            identity.agent_keypair(),
3108        ));
3109
3110        // Create gossip runtime if network exists
3111        let gossip_runtime = if let Some(ref net) = network {
3112            let runtime = gossip::GossipRuntime::new(
3113                gossip::GossipConfig::default(),
3114                std::sync::Arc::clone(net),
3115                Some(signing_ctx),
3116            )
3117            .await
3118            .map_err(|e| {
3119                error::IdentityError::Storage(std::io::Error::other(format!(
3120                    "gossip runtime initialization failed: {}",
3121                    e
3122                )))
3123            })?;
3124            Some(std::sync::Arc::new(runtime))
3125        } else {
3126            None
3127        };
3128
3129        // Initialise contact store
3130        let contacts_path = self.contact_store_path.unwrap_or_else(|| {
3131            dirs::home_dir()
3132                .unwrap_or_else(|| std::path::PathBuf::from("."))
3133                .join(".x0x")
3134                .join("contacts.json")
3135        });
3136        let contact_store = std::sync::Arc::new(tokio::sync::RwLock::new(
3137            contacts::ContactStore::new(contacts_path),
3138        ));
3139
3140        // Wrap bootstrap cache with gossip coordinator adapter (zero duplication).
3141        let gossip_cache_adapter = bootstrap_cache.as_ref().map(|cache| {
3142            saorsa_gossip_coordinator::GossipCacheAdapter::new(std::sync::Arc::clone(cache))
3143        });
3144
3145        // Initialize direct messaging infrastructure
3146        let direct_messaging = std::sync::Arc::new(direct::DirectMessaging::new());
3147
3148        // Create presence wrapper if network exists
3149        let presence = if let Some(ref net) = network {
3150            let peer_id = saorsa_gossip_transport::GossipTransport::local_peer_id(net.as_ref());
3151            let pw = presence::PresenceWrapper::new(
3152                peer_id,
3153                std::sync::Arc::clone(net),
3154                presence::PresenceConfig::default(),
3155                bootstrap_cache.clone(),
3156            )
3157            .map_err(|e| {
3158                error::IdentityError::Storage(std::io::Error::other(format!(
3159                    "presence initialization failed: {}",
3160                    e
3161                )))
3162            })?;
3163            let pw_arc = std::sync::Arc::new(pw);
3164            // Wire presence into gossip runtime for Bulk dispatch
3165            if let Some(ref rt) = gossip_runtime {
3166                rt.set_presence(std::sync::Arc::clone(&pw_arc));
3167            }
3168            Some(pw_arc)
3169        } else {
3170            None
3171        };
3172
3173        Ok(Agent {
3174            identity: std::sync::Arc::new(identity),
3175            network,
3176            gossip_runtime,
3177            bootstrap_cache,
3178            gossip_cache_adapter,
3179            identity_discovery_cache: std::sync::Arc::new(tokio::sync::RwLock::new(
3180                std::collections::HashMap::new(),
3181            )),
3182            identity_listener_started: std::sync::atomic::AtomicBool::new(false),
3183            heartbeat_interval_secs: self
3184                .heartbeat_interval_secs
3185                .unwrap_or(IDENTITY_HEARTBEAT_INTERVAL_SECS),
3186            identity_ttl_secs: self.identity_ttl_secs.unwrap_or(IDENTITY_TTL_SECS),
3187            heartbeat_handle: tokio::sync::Mutex::new(None),
3188            rendezvous_advertised: std::sync::atomic::AtomicBool::new(false),
3189            contact_store,
3190            direct_messaging,
3191            direct_listener_started: std::sync::atomic::AtomicBool::new(false),
3192            presence,
3193        })
3194    }
3195}
3196
3197/// Handle for interacting with a collaborative task list.
3198///
3199/// Provides a safe, concurrent interface to a TaskList backed by
3200/// CRDT synchronization. All operations are async and return Results.
3201///
3202/// # Example
3203///
3204/// ```ignore
3205/// let handle = agent.create_task_list("My List", "topic").await?;
3206/// let task_id = handle.add_task("Write docs".to_string(), "API docs".to_string()).await?;
3207/// handle.claim_task(task_id).await?;
3208/// handle.complete_task(task_id).await?;
3209/// ```
3210#[derive(Clone)]
3211pub struct TaskListHandle {
3212    sync: std::sync::Arc<crdt::TaskListSync>,
3213    agent_id: identity::AgentId,
3214    peer_id: saorsa_gossip_types::PeerId,
3215}
3216
3217impl std::fmt::Debug for TaskListHandle {
3218    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3219        f.debug_struct("TaskListHandle")
3220            .field("agent_id", &self.agent_id)
3221            .field("peer_id", &self.peer_id)
3222            .finish_non_exhaustive()
3223    }
3224}
3225
3226impl TaskListHandle {
3227    /// Add a new task to the list.
3228    ///
3229    /// # Arguments
3230    ///
3231    /// * `title` - Task title
3232    /// * `description` - Task description
3233    ///
3234    /// # Returns
3235    ///
3236    /// The TaskId of the created task.
3237    ///
3238    /// # Errors
3239    ///
3240    /// Returns an error if the task cannot be added.
3241    pub async fn add_task(
3242        &self,
3243        title: String,
3244        description: String,
3245    ) -> error::Result<crdt::TaskId> {
3246        let (task_id, delta) = {
3247            let mut list = self.sync.write().await;
3248            let seq = list.next_seq();
3249            let task_id = crdt::TaskId::new(&title, &self.agent_id, seq);
3250            let metadata = crdt::TaskMetadata::new(title, description, 128, self.agent_id, seq);
3251            let task = crdt::TaskItem::new(task_id, metadata, self.peer_id);
3252            list.add_task(task.clone(), self.peer_id, seq)
3253                .map_err(|e| {
3254                    error::IdentityError::Storage(std::io::Error::other(format!(
3255                        "add_task failed: {}",
3256                        e
3257                    )))
3258                })?;
3259            let tag = (self.peer_id, seq);
3260            let delta = crdt::TaskListDelta::for_add(task_id, task, tag, list.current_version());
3261            (task_id, delta)
3262        };
3263        // Best-effort replication: local mutation succeeded regardless
3264        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3265            tracing::warn!("failed to publish add_task delta: {}", e);
3266        }
3267        Ok(task_id)
3268    }
3269
3270    /// Claim a task in the list.
3271    ///
3272    /// # Arguments
3273    ///
3274    /// * `task_id` - ID of the task to claim
3275    ///
3276    /// # Errors
3277    ///
3278    /// Returns an error if the task cannot be claimed.
3279    pub async fn claim_task(&self, task_id: crdt::TaskId) -> error::Result<()> {
3280        let delta = {
3281            let mut list = self.sync.write().await;
3282            let seq = list.next_seq();
3283            list.claim_task(&task_id, self.agent_id, self.peer_id, seq)
3284                .map_err(|e| {
3285                    error::IdentityError::Storage(std::io::Error::other(format!(
3286                        "claim_task failed: {}",
3287                        e
3288                    )))
3289                })?;
3290            // Include full task so receivers can upsert if add hasn't arrived yet
3291            let full_task = list
3292                .get_task(&task_id)
3293                .ok_or_else(|| {
3294                    error::IdentityError::Storage(std::io::Error::other(
3295                        "task disappeared after claim",
3296                    ))
3297                })?
3298                .clone();
3299            crdt::TaskListDelta::for_state_change(task_id, full_task, list.current_version())
3300        };
3301        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3302            tracing::warn!("failed to publish claim_task delta: {}", e);
3303        }
3304        Ok(())
3305    }
3306
3307    /// Complete a task in the list.
3308    ///
3309    /// # Arguments
3310    ///
3311    /// * `task_id` - ID of the task to complete
3312    ///
3313    /// # Errors
3314    ///
3315    /// Returns an error if the task cannot be completed.
3316    pub async fn complete_task(&self, task_id: crdt::TaskId) -> error::Result<()> {
3317        let delta = {
3318            let mut list = self.sync.write().await;
3319            let seq = list.next_seq();
3320            list.complete_task(&task_id, self.agent_id, self.peer_id, seq)
3321                .map_err(|e| {
3322                    error::IdentityError::Storage(std::io::Error::other(format!(
3323                        "complete_task failed: {}",
3324                        e
3325                    )))
3326                })?;
3327            let full_task = list
3328                .get_task(&task_id)
3329                .ok_or_else(|| {
3330                    error::IdentityError::Storage(std::io::Error::other(
3331                        "task disappeared after complete",
3332                    ))
3333                })?
3334                .clone();
3335            crdt::TaskListDelta::for_state_change(task_id, full_task, list.current_version())
3336        };
3337        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3338            tracing::warn!("failed to publish complete_task delta: {}", e);
3339        }
3340        Ok(())
3341    }
3342
3343    /// List all tasks in their current order.
3344    ///
3345    /// # Returns
3346    ///
3347    /// A vector of `TaskSnapshot` representing the current state.
3348    ///
3349    /// # Errors
3350    ///
3351    /// Returns an error if the task list cannot be read.
3352    pub async fn list_tasks(&self) -> error::Result<Vec<TaskSnapshot>> {
3353        let list = self.sync.read().await;
3354        let tasks = list.tasks_ordered();
3355        Ok(tasks
3356            .into_iter()
3357            .map(|task| TaskSnapshot {
3358                id: *task.id(),
3359                title: task.title().to_string(),
3360                description: task.description().to_string(),
3361                state: task.current_state(),
3362                assignee: task.assignee().copied(),
3363                owner: None,
3364                priority: task.priority(),
3365            })
3366            .collect())
3367    }
3368
3369    /// Reorder tasks in the list.
3370    ///
3371    /// # Arguments
3372    ///
3373    /// * `task_ids` - New ordering of task IDs
3374    ///
3375    /// # Errors
3376    ///
3377    /// Returns an error if reordering fails.
3378    pub async fn reorder(&self, task_ids: Vec<crdt::TaskId>) -> error::Result<()> {
3379        let delta = {
3380            let mut list = self.sync.write().await;
3381            list.reorder(task_ids.clone(), self.peer_id).map_err(|e| {
3382                error::IdentityError::Storage(std::io::Error::other(format!(
3383                    "reorder failed: {}",
3384                    e
3385                )))
3386            })?;
3387            crdt::TaskListDelta::for_reorder(task_ids, list.current_version())
3388        };
3389        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3390            tracing::warn!("failed to publish reorder delta: {}", e);
3391        }
3392        Ok(())
3393    }
3394}
3395
3396// ---------------------------------------------------------------------------
3397// KvStore API
3398// ---------------------------------------------------------------------------
3399
3400impl Agent {
3401    /// Create a new key-value store.
3402    ///
3403    /// The store is automatically synchronized to all peers subscribed
3404    /// to the same `topic` via gossip delta propagation.
3405    ///
3406    /// # Errors
3407    ///
3408    /// Returns an error if the gossip runtime is not initialized.
3409    pub async fn create_kv_store(&self, name: &str, topic: &str) -> error::Result<KvStoreHandle> {
3410        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3411            error::IdentityError::Storage(std::io::Error::other(
3412                "gossip runtime not initialized - configure agent with network first",
3413            ))
3414        })?;
3415
3416        let peer_id = runtime.peer_id();
3417        let store_id = kv::KvStoreId::from_content(name, &self.agent_id());
3418        let store = kv::KvStore::new(
3419            store_id,
3420            name.to_string(),
3421            self.agent_id(),
3422            kv::AccessPolicy::Signed,
3423        );
3424
3425        let sync = kv::KvStoreSync::new(
3426            store,
3427            std::sync::Arc::clone(runtime.pubsub()),
3428            topic.to_string(),
3429            30,
3430        )
3431        .map_err(|e| {
3432            error::IdentityError::Storage(std::io::Error::other(format!(
3433                "kv store sync creation failed: {e}",
3434            )))
3435        })?;
3436
3437        let sync = std::sync::Arc::new(sync);
3438        sync.start().await.map_err(|e| {
3439            error::IdentityError::Storage(std::io::Error::other(format!(
3440                "kv store sync start failed: {e}",
3441            )))
3442        })?;
3443
3444        Ok(KvStoreHandle {
3445            sync,
3446            agent_id: self.agent_id(),
3447            peer_id,
3448        })
3449    }
3450
3451    /// Join an existing key-value store by topic.
3452    ///
3453    /// Creates an empty store that will be populated via delta sync
3454    /// from peers already sharing the topic. The access policy will
3455    /// be learned from the first full delta received from the owner.
3456    ///
3457    /// # Errors
3458    ///
3459    /// Returns an error if the gossip runtime is not initialized.
3460    pub async fn join_kv_store(&self, topic: &str) -> error::Result<KvStoreHandle> {
3461        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3462            error::IdentityError::Storage(std::io::Error::other(
3463                "gossip runtime not initialized - configure agent with network first",
3464            ))
3465        })?;
3466
3467        let peer_id = runtime.peer_id();
3468        let store_id = kv::KvStoreId::from_content(topic, &self.agent_id());
3469        // Use Encrypted as the most permissive default — the actual policy
3470        // will be set when the first delta from the owner arrives.
3471        let store = kv::KvStore::new(
3472            store_id,
3473            String::new(),
3474            self.agent_id(),
3475            kv::AccessPolicy::Encrypted {
3476                group_id: Vec::new(),
3477            },
3478        );
3479
3480        let sync = kv::KvStoreSync::new(
3481            store,
3482            std::sync::Arc::clone(runtime.pubsub()),
3483            topic.to_string(),
3484            30,
3485        )
3486        .map_err(|e| {
3487            error::IdentityError::Storage(std::io::Error::other(format!(
3488                "kv store sync creation failed: {e}",
3489            )))
3490        })?;
3491
3492        let sync = std::sync::Arc::new(sync);
3493        sync.start().await.map_err(|e| {
3494            error::IdentityError::Storage(std::io::Error::other(format!(
3495                "kv store sync start failed: {e}",
3496            )))
3497        })?;
3498
3499        Ok(KvStoreHandle {
3500            sync,
3501            agent_id: self.agent_id(),
3502            peer_id,
3503        })
3504    }
3505}
3506
3507/// Handle for interacting with a replicated key-value store.
3508///
3509/// Provides async methods for putting, getting, and removing entries.
3510/// Changes are automatically replicated to peers via gossip.
3511#[derive(Clone)]
3512pub struct KvStoreHandle {
3513    sync: std::sync::Arc<kv::KvStoreSync>,
3514    agent_id: identity::AgentId,
3515    peer_id: saorsa_gossip_types::PeerId,
3516}
3517
3518impl std::fmt::Debug for KvStoreHandle {
3519    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3520        f.debug_struct("KvStoreHandle")
3521            .field("agent_id", &self.agent_id)
3522            .field("peer_id", &self.peer_id)
3523            .finish_non_exhaustive()
3524    }
3525}
3526
3527impl KvStoreHandle {
3528    /// Put a key-value pair into the store.
3529    ///
3530    /// If the key already exists, the value is updated. Changes are
3531    /// automatically replicated to peers via gossip.
3532    ///
3533    /// # Errors
3534    ///
3535    /// Returns an error if the value exceeds the maximum inline size (64 KB).
3536    pub async fn put(
3537        &self,
3538        key: String,
3539        value: Vec<u8>,
3540        content_type: String,
3541    ) -> error::Result<()> {
3542        let delta = {
3543            let mut store = self.sync.write().await;
3544            store
3545                .put(
3546                    key.clone(),
3547                    value.clone(),
3548                    content_type.clone(),
3549                    self.peer_id,
3550                )
3551                .map_err(|e| {
3552                    error::IdentityError::Storage(std::io::Error::other(format!(
3553                        "kv put failed: {e}",
3554                    )))
3555                })?;
3556            let entry = store.get(&key).cloned();
3557            let version = store.current_version();
3558            match entry {
3559                Some(e) => {
3560                    kv::KvStoreDelta::for_put(key, e, (self.peer_id, store.next_seq()), version)
3561                }
3562                None => return Ok(()), // shouldn't happen after successful put
3563            }
3564        };
3565        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3566            tracing::warn!("failed to publish kv put delta: {e}");
3567        }
3568        Ok(())
3569    }
3570
3571    /// Get a value by key.
3572    ///
3573    /// Returns `None` if the key does not exist or has been removed.
3574    ///
3575    /// # Errors
3576    ///
3577    /// Returns an error if the store cannot be read.
3578    pub async fn get(&self, key: &str) -> error::Result<Option<KvEntrySnapshot>> {
3579        let store = self.sync.read().await;
3580        Ok(store.get(key).map(|e| KvEntrySnapshot {
3581            key: e.key.clone(),
3582            value: e.value.clone(),
3583            content_hash: hex::encode(e.content_hash),
3584            content_type: e.content_type.clone(),
3585            metadata: e.metadata.clone(),
3586            created_at: e.created_at,
3587            updated_at: e.updated_at,
3588        }))
3589    }
3590
3591    /// Remove a key from the store.
3592    ///
3593    /// # Errors
3594    ///
3595    /// Returns an error if the key does not exist.
3596    pub async fn remove(&self, key: &str) -> error::Result<()> {
3597        let delta = {
3598            let mut store = self.sync.write().await;
3599            store.remove(key).map_err(|e| {
3600                error::IdentityError::Storage(std::io::Error::other(format!(
3601                    "kv remove failed: {e}",
3602                )))
3603            })?;
3604            let mut d = kv::KvStoreDelta::new(store.current_version());
3605            d.removed
3606                .insert(key.to_string(), std::collections::HashSet::new());
3607            d
3608        };
3609        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3610            tracing::warn!("failed to publish kv remove delta: {e}");
3611        }
3612        Ok(())
3613    }
3614
3615    /// List all active keys in the store.
3616    ///
3617    /// # Errors
3618    ///
3619    /// Returns an error if the store cannot be read.
3620    pub async fn keys(&self) -> error::Result<Vec<KvEntrySnapshot>> {
3621        let store = self.sync.read().await;
3622        Ok(store
3623            .active_entries()
3624            .into_iter()
3625            .map(|e| KvEntrySnapshot {
3626                key: e.key.clone(),
3627                value: e.value.clone(),
3628                content_hash: hex::encode(e.content_hash),
3629                content_type: e.content_type.clone(),
3630                metadata: e.metadata.clone(),
3631                created_at: e.created_at,
3632                updated_at: e.updated_at,
3633            })
3634            .collect())
3635    }
3636
3637    /// Get the store name.
3638    ///
3639    /// # Errors
3640    ///
3641    /// Returns an error if the store cannot be read.
3642    pub async fn name(&self) -> error::Result<String> {
3643        let store = self.sync.read().await;
3644        Ok(store.name().to_string())
3645    }
3646}
3647
3648/// Read-only snapshot of a KvStore entry.
3649#[derive(Debug, Clone, serde::Serialize)]
3650pub struct KvEntrySnapshot {
3651    /// The key.
3652    pub key: String,
3653    /// The value bytes.
3654    pub value: Vec<u8>,
3655    /// BLAKE3 hash of the value (hex-encoded).
3656    pub content_hash: String,
3657    /// Content type (MIME).
3658    pub content_type: String,
3659    /// User metadata.
3660    pub metadata: std::collections::HashMap<String, String>,
3661    /// Unix milliseconds when created.
3662    pub created_at: u64,
3663    /// Unix milliseconds when last updated.
3664    pub updated_at: u64,
3665}
3666
3667/// Read-only snapshot of a task's current state.
3668///
3669/// This is returned by `TaskListHandle::list_tasks()` and hides CRDT
3670/// internals, providing a clean API surface.
3671#[derive(Debug, Clone)]
3672pub struct TaskSnapshot {
3673    /// Unique task identifier.
3674    pub id: crdt::TaskId,
3675    /// Task title.
3676    pub title: String,
3677    /// Task description.
3678    pub description: String,
3679    /// Current checkbox state (Empty, Claimed, or Done).
3680    pub state: crdt::CheckboxState,
3681    /// Agent assigned to this task (if any).
3682    pub assignee: Option<identity::AgentId>,
3683    /// Human owner of the agent that created this task (if known).
3684    pub owner: Option<identity::UserId>,
3685    /// Task priority (0-255, higher = more important).
3686    pub priority: u8,
3687}
3688
3689/// The x0x protocol version.
3690pub const VERSION: &str = env!("CARGO_PKG_VERSION");
3691
3692/// The name. Three bytes. A palindrome. A philosophy.
3693pub const NAME: &str = "x0x";
3694
3695#[cfg(test)]
3696mod tests {
3697    use super::*;
3698
3699    #[test]
3700    fn name_is_palindrome() {
3701        let name = NAME;
3702        let reversed: String = name.chars().rev().collect();
3703        assert_eq!(name, reversed, "x0x must be a palindrome");
3704    }
3705
3706    #[test]
3707    fn name_is_three_bytes() {
3708        assert_eq!(NAME.len(), 3, "x0x must be exactly three bytes");
3709    }
3710
3711    #[test]
3712    fn name_is_ai_native() {
3713        // No uppercase, no spaces, no special chars that conflict
3714        // with shell, YAML, Markdown, or URL encoding
3715        assert!(NAME.chars().all(|c| c.is_ascii_alphanumeric()));
3716    }
3717
3718    #[tokio::test]
3719    async fn agent_creates() {
3720        let agent = Agent::new().await;
3721        assert!(agent.is_ok());
3722    }
3723
3724    #[tokio::test]
3725    async fn agent_joins_network() {
3726        let agent = Agent::new().await.unwrap();
3727        assert!(agent.join_network().await.is_ok());
3728    }
3729
3730    #[tokio::test]
3731    async fn agent_subscribes() {
3732        let agent = Agent::new().await.unwrap();
3733        // Currently returns error - will be implemented in Task 3
3734        assert!(agent.subscribe("test-topic").await.is_err());
3735    }
3736
3737    #[tokio::test]
3738    async fn identity_announcement_machine_signature_verifies() {
3739        let agent = Agent::builder()
3740            .with_network_config(network::NetworkConfig::default())
3741            .build()
3742            .await
3743            .unwrap();
3744
3745        let announcement = agent.build_identity_announcement(false, false).unwrap();
3746        assert_eq!(announcement.agent_id, agent.agent_id());
3747        assert_eq!(announcement.machine_id, agent.machine_id());
3748        assert!(announcement.user_id.is_none());
3749        assert!(announcement.agent_certificate.is_none());
3750        assert!(announcement.verify().is_ok());
3751    }
3752
3753    #[tokio::test]
3754    async fn identity_announcement_requires_human_consent() {
3755        let agent = Agent::builder()
3756            .with_network_config(network::NetworkConfig::default())
3757            .build()
3758            .await
3759            .unwrap();
3760
3761        let err = agent.build_identity_announcement(true, false).unwrap_err();
3762        assert!(
3763            err.to_string().contains("explicit human consent"),
3764            "unexpected error: {err}"
3765        );
3766    }
3767
3768    #[tokio::test]
3769    async fn identity_announcement_with_user_requires_user_identity() {
3770        let agent = Agent::builder()
3771            .with_network_config(network::NetworkConfig::default())
3772            .build()
3773            .await
3774            .unwrap();
3775
3776        let err = agent.build_identity_announcement(true, true).unwrap_err();
3777        assert!(
3778            err.to_string().contains("no user identity is configured"),
3779            "unexpected error: {err}"
3780        );
3781    }
3782
3783    #[tokio::test]
3784    async fn announce_identity_populates_discovery_cache() {
3785        let user_key = identity::UserKeypair::generate().unwrap();
3786        let agent = Agent::builder()
3787            .with_network_config(network::NetworkConfig::default())
3788            .with_user_key(user_key)
3789            .build()
3790            .await
3791            .unwrap();
3792
3793        agent.announce_identity(true, true).await.unwrap();
3794        let discovered = agent.discovered_agent(agent.agent_id()).await.unwrap();
3795        let entry = discovered.expect("agent should discover its own announcement");
3796
3797        assert_eq!(entry.agent_id, agent.agent_id());
3798        assert_eq!(entry.machine_id, agent.machine_id());
3799        assert_eq!(entry.user_id, agent.user_id());
3800    }
3801
3802    /// An announcement without NAT fields (as produced by old nodes) should still
3803    /// deserialise correctly via bincode — new fields are `Option` so `None` (0x00)
3804    /// is a valid encoding.
3805    #[test]
3806    fn identity_announcement_backward_compat_no_nat_fields() {
3807        use identity::{AgentId, MachineId};
3808
3809        // Build an announcement that omits the nat_* fields by serializing an old-style
3810        // struct that matches the pre-1.3 wire format.
3811        #[derive(serde::Serialize, serde::Deserialize)]
3812        struct OldIdentityAnnouncementUnsigned {
3813            agent_id: AgentId,
3814            machine_id: MachineId,
3815            user_id: Option<identity::UserId>,
3816            agent_certificate: Option<identity::AgentCertificate>,
3817            machine_public_key: Vec<u8>,
3818            addresses: Vec<std::net::SocketAddr>,
3819            announced_at: u64,
3820        }
3821
3822        let agent_id = AgentId([1u8; 32]);
3823        let machine_id = MachineId([2u8; 32]);
3824        let old = OldIdentityAnnouncementUnsigned {
3825            agent_id,
3826            machine_id,
3827            user_id: None,
3828            agent_certificate: None,
3829            machine_public_key: vec![0u8; 10],
3830            addresses: Vec::new(),
3831            announced_at: 1234,
3832        };
3833        let bytes = bincode::serialize(&old).expect("serialize old announcement");
3834
3835        // Attempt to deserialize as the new struct — this tests that the new fields
3836        // (which are Option<T>) do NOT break deserialization of the old format.
3837        // Note: bincode 1.x is not self-describing, so adding fields to a struct DOES
3838        // change the wire format.  This test documents the expected behavior.
3839        // Old format -> new struct: will fail because new struct has more fields.
3840        // New format -> old struct: will have trailing bytes.
3841        // This is acceptable — we document the protocol change.
3842        let result = bincode::deserialize::<IdentityAnnouncementUnsigned>(&bytes);
3843        // Old nodes produce shorter messages; new nodes cannot decode them as new structs.
3844        // This confirms the protocol is not transparent — nodes must upgrade together.
3845        assert!(
3846            result.is_err(),
3847            "Old-format announcement should not decode as new struct (protocol upgrade required)"
3848        );
3849    }
3850
3851    /// A new announcement with all NAT fields set round-trips through bincode.
3852    #[test]
3853    fn identity_announcement_nat_fields_round_trip() {
3854        use identity::{AgentId, MachineId};
3855
3856        let unsigned = IdentityAnnouncementUnsigned {
3857            agent_id: AgentId([1u8; 32]),
3858            machine_id: MachineId([2u8; 32]),
3859            user_id: None,
3860            agent_certificate: None,
3861            machine_public_key: vec![0u8; 10],
3862            addresses: Vec::new(),
3863            announced_at: 9999,
3864            nat_type: Some("FullCone".to_string()),
3865            can_receive_direct: Some(true),
3866            is_relay: Some(false),
3867            is_coordinator: Some(true),
3868        };
3869        let bytes = bincode::serialize(&unsigned).expect("serialize");
3870        let decoded: IdentityAnnouncementUnsigned =
3871            bincode::deserialize(&bytes).expect("deserialize");
3872        assert_eq!(decoded.nat_type.as_deref(), Some("FullCone"));
3873        assert_eq!(decoded.can_receive_direct, Some(true));
3874        assert_eq!(decoded.is_relay, Some(false));
3875        assert_eq!(decoded.is_coordinator, Some(true));
3876    }
3877
3878    /// An announcement with None for all NAT fields (e.g. network not started)
3879    /// round-trips correctly.
3880    #[test]
3881    fn identity_announcement_no_nat_fields_round_trip() {
3882        use identity::{AgentId, MachineId};
3883
3884        let unsigned = IdentityAnnouncementUnsigned {
3885            agent_id: AgentId([3u8; 32]),
3886            machine_id: MachineId([4u8; 32]),
3887            user_id: None,
3888            agent_certificate: None,
3889            machine_public_key: vec![0u8; 10],
3890            addresses: Vec::new(),
3891            announced_at: 42,
3892            nat_type: None,
3893            can_receive_direct: None,
3894            is_relay: None,
3895            is_coordinator: None,
3896        };
3897        let bytes = bincode::serialize(&unsigned).expect("serialize");
3898        let decoded: IdentityAnnouncementUnsigned =
3899            bincode::deserialize(&bytes).expect("deserialize");
3900        assert!(decoded.nat_type.is_none());
3901        assert!(decoded.can_receive_direct.is_none());
3902        assert!(decoded.is_relay.is_none());
3903        assert!(decoded.is_coordinator.is_none());
3904    }
3905}