Skip to main content

x0x/
lib.rs

1#![allow(clippy::unwrap_used)]
2#![allow(clippy::expect_used)]
3#![allow(missing_docs)]
4
5//! # x0x
6//!
7//! Agent-to-agent gossip network for AI systems.
8//!
9//! Named after a tic-tac-toe sequence — X, zero, X — inspired by the
10//! *WarGames* insight that adversarial games between equally matched
11//! opponents always end in a draw. The only winning move is not to play.
12//!
13//! x0x applies this principle to AI-human relations: there is no winner
14//! in an adversarial framing, so the rational strategy is cooperation.
15//!
16//! Built on [saorsa-gossip](https://github.com/saorsa-labs/saorsa-gossip)
17//! and [ant-quic](https://github.com/saorsa-labs/ant-quic) by
18//! [Saorsa Labs](https://saorsalabs.com). *Saorsa* is Scottish Gaelic
19//! for **freedom**.
20//!
21//! ## Quick Start
22//!
23//! ```rust,no_run
24//! use x0x::Agent;
25//!
26//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
27//! // Create an agent with default configuration
28//! // This automatically connects to 6 global bootstrap nodes
29//! let agent = Agent::builder()
30//!     .build()
31//!     .await?;
32//!
33//! // Join the x0x network
34//! agent.join_network().await?;
35//!
36//! // Subscribe to a topic and receive messages
37//! let mut rx = agent.subscribe("coordination").await?;
38//! while let Some(msg) = rx.recv().await {
39//!     println!("topic: {:?}, payload: {:?}", msg.topic, msg.payload);
40//! }
41//! # Ok(())
42//! # }
43//! ```
44//!
45//! ## Bootstrap Nodes
46//!
47//! Agents automatically connect to Saorsa Labs' global bootstrap network:
48//! - NYC, US · SFO, US · Helsinki, FI
49//! - Nuremberg, DE · Singapore, SG · Tokyo, JP
50//!
51//! These nodes provide initial peer discovery and NAT traversal.
52
53/// Error types for x0x identity and network operations.
54pub mod error;
55
56/// Core identity types for x0x agents.
57///
58/// This module provides the cryptographic identity foundation for x0x:
59/// - [`crate::identity::MachineId`]: Machine-pinned identity for QUIC authentication
60/// - [`crate::identity::AgentId`]: Portable agent identity for cross-machine persistence
61pub mod identity;
62
63/// Key storage serialization for x0x identities.
64///
65/// This module provides serialization and deserialization functions for
66/// persistent storage of MachineKeypair and AgentKeypair.
67pub mod storage;
68
69/// Bootstrap node discovery and connection.
70///
71/// This module handles initial connection to bootstrap nodes with
72/// exponential backoff retry logic and peer cache integration.
73pub mod bootstrap;
74/// Network transport layer for x0x.
75pub mod network;
76
77/// Contact store with trust levels for message filtering.
78pub mod contacts;
79
80/// Trust evaluation for `(identity, machine)` pairs.
81///
82/// The [`trust::TrustEvaluator`] combines an agent's trust level with its
83/// identity type and machine records to produce a [`trust::TrustDecision`].
84pub mod trust;
85
86/// Agent-to-agent connectivity helpers.
87///
88/// Provides `ReachabilityInfo` (built from a `DiscoveredAgent`) and
89/// `ConnectOutcome` for the result of `connect_to_agent()`.
90pub mod connectivity;
91
92/// Gossip overlay networking for x0x.
93pub mod gossip;
94
95/// CRDT-based collaborative task lists.
96pub mod crdt;
97
98/// CRDT-backed key-value store.
99pub mod kv;
100
101/// High-level group management (MLS + KvStore + gossip).
102pub mod groups;
103
104/// MLS (Messaging Layer Security) group encryption.
105pub mod mls;
106
107/// Direct agent-to-agent messaging.
108///
109/// Point-to-point communication that bypasses gossip for private,
110/// efficient, reliable delivery between connected agents.
111pub mod direct;
112
113/// Self-update system with ML-DSA-65 signature verification and staged rollout.
114pub mod upgrade;
115
116/// File transfer protocol types and state management.
117pub mod files;
118
119/// Shared API endpoint registry consumed by both x0xd and the x0x CLI.
120pub mod api;
121
122/// CLI infrastructure and command implementations.
123pub mod cli;
124
125// Re-export key gossip types (including new pubsub components)
126pub use gossip::{
127    GossipConfig, GossipRuntime, PubSubManager, PubSubMessage, SigningContext, Subscription,
128};
129
130// Re-export direct messaging types
131pub use direct::{DirectMessage, DirectMessageReceiver, DirectMessaging};
132
133// Import Membership trait for HyParView join() method
134use saorsa_gossip_membership::Membership as _;
135
136/// The core agent that participates in the x0x gossip network.
137///
138/// Each agent is a peer — there is no client/server distinction.
139/// Agents discover each other through gossip and communicate
140/// via epidemic broadcast.
141///
142/// An Agent wraps an [`identity::Identity`] that provides:
143/// - `machine_id`: Tied to this computer (for QUIC transport authentication)
144/// - `agent_id`: Portable across machines (for agent persistence)
145///
146/// # Example
147///
148/// ```ignore
149/// use x0x::Agent;
150///
151/// let agent = Agent::builder()
152///     .build()
153///     .await?;
154///
155/// println!("Agent ID: {}", agent.agent_id());
156/// ```
157#[derive(Debug)]
158pub struct Agent {
159    identity: std::sync::Arc<identity::Identity>,
160    /// The network node for P2P communication.
161    #[allow(dead_code)]
162    network: Option<std::sync::Arc<network::NetworkNode>>,
163    /// The gossip runtime for pub/sub messaging.
164    gossip_runtime: Option<std::sync::Arc<gossip::GossipRuntime>>,
165    /// Bootstrap peer cache for quality-based peer selection across restarts.
166    bootstrap_cache: Option<std::sync::Arc<ant_quic::BootstrapCache>>,
167    /// Cache of discovered agents from identity announcements.
168    identity_discovery_cache: std::sync::Arc<
169        tokio::sync::RwLock<std::collections::HashMap<identity::AgentId, DiscoveredAgent>>,
170    >,
171    /// Ensures identity discovery listener is spawned once.
172    identity_listener_started: std::sync::atomic::AtomicBool,
173    /// How often to re-announce identity (seconds).
174    heartbeat_interval_secs: u64,
175    /// How long before a cache entry is filtered out (seconds).
176    identity_ttl_secs: u64,
177    /// Handle for the running heartbeat task, if started.
178    heartbeat_handle: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
179    /// Whether a rendezvous `ProviderSummary` advertisement is active.
180    rendezvous_advertised: std::sync::atomic::AtomicBool,
181    /// Contact store for trust evaluation of incoming identity announcements.
182    contact_store: std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>>,
183    /// Direct messaging infrastructure for point-to-point communication.
184    direct_messaging: std::sync::Arc<direct::DirectMessaging>,
185}
186
187/// A message received from the gossip network.
188#[derive(Debug, Clone)]
189pub struct Message {
190    /// The originating agent's identifier.
191    pub origin: String,
192    /// The message payload.
193    pub payload: Vec<u8>,
194    /// The topic this message was published to.
195    pub topic: String,
196}
197
198/// Reserved gossip topic for signed identity announcements.
199pub const IDENTITY_ANNOUNCE_TOPIC: &str = "x0x.identity.announce.v1";
200
201/// Return the shard-specific gossip topic for the given `agent_id`.
202///
203/// Each agent publishes identity announcements to a deterministic shard topic
204/// (`x0x.identity.shard.<u16>`) derived from its agent ID, in addition to the
205/// legacy broadcast topic.  This distributes announcements across 65,536 shards
206/// so that at scale not every node is forced to receive every announcement.
207///
208/// The shard is computed with `saorsa_gossip_rendezvous::calculate_shard`, which
209/// applies BLAKE3(`"saorsa-rendezvous" || agent_id`) and takes the low 16 bits.
210#[must_use]
211pub fn shard_topic_for_agent(agent_id: &identity::AgentId) -> String {
212    let shard = saorsa_gossip_rendezvous::calculate_shard(&agent_id.0);
213    format!("x0x.identity.shard.{shard}")
214}
215
216/// Gossip topic prefix for rendezvous `ProviderSummary` advertisements.
217pub const RENDEZVOUS_SHARD_TOPIC_PREFIX: &str = "x0x.rendezvous.shard";
218
219/// Return the rendezvous shard gossip topic for the given `agent_id`.
220///
221/// Agents publish [`saorsa_gossip_rendezvous::ProviderSummary`] records to this
222/// topic so that seekers can find them even when the two peers have never been
223/// on the same gossip overlay partition.
224#[must_use]
225pub fn rendezvous_shard_topic_for_agent(agent_id: &identity::AgentId) -> String {
226    let shard = saorsa_gossip_rendezvous::calculate_shard(&agent_id.0);
227    format!("{RENDEZVOUS_SHARD_TOPIC_PREFIX}.{shard}")
228}
229
230/// Default interval between identity heartbeat re-announcements (seconds).
231pub const IDENTITY_HEARTBEAT_INTERVAL_SECS: u64 = 300;
232
233/// Default TTL for discovered agent cache entries (seconds).
234///
235/// Entries not refreshed within this window are filtered from
236/// [`Agent::presence`] and [`Agent::discovered_agents`].
237pub const IDENTITY_TTL_SECS: u64 = 900;
238
239#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
240struct IdentityAnnouncementUnsigned {
241    agent_id: identity::AgentId,
242    machine_id: identity::MachineId,
243    user_id: Option<identity::UserId>,
244    agent_certificate: Option<identity::AgentCertificate>,
245    machine_public_key: Vec<u8>,
246    addresses: Vec<std::net::SocketAddr>,
247    announced_at: u64,
248    /// NAT type string (e.g. "FullCone", "Symmetric", "Unknown").
249    nat_type: Option<String>,
250    /// Whether the machine can receive direct inbound connections.
251    can_receive_direct: Option<bool>,
252    /// Whether the machine is currently relaying traffic for others.
253    is_relay: Option<bool>,
254    /// Whether the machine is coordinating NAT traversal for peers.
255    is_coordinator: Option<bool>,
256}
257
258/// Signed identity announcement broadcast by agents.
259///
260/// The outer pub/sub envelope is agent-signed (v2 message format), and this
261/// payload is machine-signed to bind the daemon's PQC key to the announcement.
262#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
263pub struct IdentityAnnouncement {
264    /// Portable agent identity.
265    pub agent_id: identity::AgentId,
266    /// Machine identity for the daemon process.
267    pub machine_id: identity::MachineId,
268    /// Optional human identity (only when explicitly consented).
269    pub user_id: Option<identity::UserId>,
270    /// Optional user->agent certificate.
271    pub agent_certificate: Option<identity::AgentCertificate>,
272    /// Machine ML-DSA-65 public key bytes.
273    pub machine_public_key: Vec<u8>,
274    /// Machine ML-DSA-65 signature over the unsigned announcement.
275    pub machine_signature: Vec<u8>,
276    /// Reachability hints.
277    pub addresses: Vec<std::net::SocketAddr>,
278    /// Unix timestamp (seconds) of announcement creation.
279    pub announced_at: u64,
280    /// NAT type as detected by the network layer (e.g. "FullCone", "Symmetric").
281    /// `None` when the network is not yet started or NAT type is undetermined.
282    pub nat_type: Option<String>,
283    /// Whether the machine can receive direct inbound connections.
284    /// `None` when the network is not yet started.
285    pub can_receive_direct: Option<bool>,
286    /// Whether the machine is currently relaying traffic for peers behind strict NATs.
287    /// `None` when the network is not yet started.
288    pub is_relay: Option<bool>,
289    /// Whether the machine is coordinating NAT traversal hole-punch timing for peers.
290    /// `None` when the network is not yet started.
291    pub is_coordinator: Option<bool>,
292}
293
294impl IdentityAnnouncement {
295    fn to_unsigned(&self) -> IdentityAnnouncementUnsigned {
296        IdentityAnnouncementUnsigned {
297            agent_id: self.agent_id,
298            machine_id: self.machine_id,
299            user_id: self.user_id,
300            agent_certificate: self.agent_certificate.clone(),
301            machine_public_key: self.machine_public_key.clone(),
302            addresses: self.addresses.clone(),
303            announced_at: self.announced_at,
304            nat_type: self.nat_type.clone(),
305            can_receive_direct: self.can_receive_direct,
306            is_relay: self.is_relay,
307            is_coordinator: self.is_coordinator,
308        }
309    }
310
311    /// Verify machine-key attestation and optional user->agent certificate.
312    pub fn verify(&self) -> error::Result<()> {
313        let machine_pub =
314            ant_quic::MlDsaPublicKey::from_bytes(&self.machine_public_key).map_err(|_| {
315                error::IdentityError::CertificateVerification(
316                    "invalid machine public key in announcement".to_string(),
317                )
318            })?;
319        let derived_machine_id = identity::MachineId::from_public_key(&machine_pub);
320        if derived_machine_id != self.machine_id {
321            return Err(error::IdentityError::CertificateVerification(
322                "machine_id does not match machine public key".to_string(),
323            ));
324        }
325
326        let unsigned_bytes = bincode::serialize(&self.to_unsigned()).map_err(|e| {
327            error::IdentityError::Serialization(format!(
328                "failed to serialize announcement for verification: {e}"
329            ))
330        })?;
331        let signature = ant_quic::crypto::raw_public_keys::pqc::MlDsaSignature::from_bytes(
332            &self.machine_signature,
333        )
334        .map_err(|e| {
335            error::IdentityError::CertificateVerification(format!(
336                "invalid machine signature in announcement: {:?}",
337                e
338            ))
339        })?;
340        ant_quic::crypto::raw_public_keys::pqc::verify_with_ml_dsa(
341            &machine_pub,
342            &unsigned_bytes,
343            &signature,
344        )
345        .map_err(|e| {
346            error::IdentityError::CertificateVerification(format!(
347                "machine signature verification failed: {:?}",
348                e
349            ))
350        })?;
351
352        match (self.user_id, self.agent_certificate.as_ref()) {
353            (Some(user_id), Some(cert)) => {
354                cert.verify()?;
355                let cert_agent_id = cert.agent_id()?;
356                if cert_agent_id != self.agent_id {
357                    return Err(error::IdentityError::CertificateVerification(
358                        "agent certificate agent_id mismatch".to_string(),
359                    ));
360                }
361                let cert_user_id = cert.user_id()?;
362                if cert_user_id != user_id {
363                    return Err(error::IdentityError::CertificateVerification(
364                        "agent certificate user_id mismatch".to_string(),
365                    ));
366                }
367                Ok(())
368            }
369            (None, None) => Ok(()),
370            _ => Err(error::IdentityError::CertificateVerification(
371                "user identity disclosure requires matching certificate".to_string(),
372            )),
373        }
374    }
375}
376
377/// Cached discovery data derived from identity announcements.
378#[derive(Debug, Clone)]
379pub struct DiscoveredAgent {
380    /// Portable agent identity.
381    pub agent_id: identity::AgentId,
382    /// Machine identity.
383    pub machine_id: identity::MachineId,
384    /// Optional human identity (when consented and attested).
385    pub user_id: Option<identity::UserId>,
386    /// Reachability hints.
387    pub addresses: Vec<std::net::SocketAddr>,
388    /// Announcement timestamp from the sender.
389    pub announced_at: u64,
390    /// Local timestamp (seconds) when this record was last updated.
391    pub last_seen: u64,
392    /// Raw ML-DSA-65 machine public key bytes from the announcement.
393    ///
394    /// Used to verify rendezvous `ProviderSummary` signatures before
395    /// trusting addresses received via the rendezvous shard topic.
396    #[doc(hidden)]
397    pub machine_public_key: Vec<u8>,
398    /// NAT type reported by this agent (e.g. "FullCone", "Symmetric", "Unknown").
399    /// `None` if the agent did not include NAT information.
400    pub nat_type: Option<String>,
401    /// Whether this agent's machine can receive direct inbound connections.
402    /// `None` if not reported.
403    pub can_receive_direct: Option<bool>,
404    /// Whether this agent's machine is acting as a relay for peers behind strict NATs.
405    /// `None` if not reported.
406    pub is_relay: Option<bool>,
407    /// Whether this agent's machine is coordinating NAT traversal timing for peers.
408    /// `None` if not reported.
409    pub is_coordinator: Option<bool>,
410}
411
412/// Builder for configuring an [`Agent`] before connecting to the network.
413///
414/// The builder allows customization of the agent's identity:
415/// - Machine key path: Where to store/load the machine keypair
416/// - Agent keypair: Import a portable agent identity from another machine
417/// - User keypair: Bind a human identity to this agent
418///
419/// # Example
420///
421/// ```ignore
422/// use x0x::Agent;
423///
424/// // Default: auto-generates both keypairs
425/// let agent = Agent::builder()
426///     .build()
427///     .await?;
428///
429/// // Custom machine key path
430/// let agent = Agent::builder()
431///     .with_machine_key("/custom/path/machine.key")
432///     .build()
433///     .await?;
434///
435/// // Import agent keypair
436/// let agent_kp = load_agent_keypair()?;
437/// let agent = Agent::builder()
438///     .with_agent_key(agent_kp)
439///     .build()
440///     .await?;
441///
442/// // With user identity (three-layer)
443/// let agent = Agent::builder()
444///     .with_user_key_path("~/.x0x/user.key")
445///     .build()
446///     .await?;
447/// ```
448#[derive(Debug)]
449pub struct AgentBuilder {
450    machine_key_path: Option<std::path::PathBuf>,
451    agent_keypair: Option<identity::AgentKeypair>,
452    agent_key_path: Option<std::path::PathBuf>,
453    user_keypair: Option<identity::UserKeypair>,
454    user_key_path: Option<std::path::PathBuf>,
455    #[allow(dead_code)]
456    network_config: Option<network::NetworkConfig>,
457    peer_cache_dir: Option<std::path::PathBuf>,
458    heartbeat_interval_secs: Option<u64>,
459    identity_ttl_secs: Option<u64>,
460    /// Custom path for the contacts file.
461    contact_store_path: Option<std::path::PathBuf>,
462}
463
464/// Context captured by the background identity heartbeat task.
465struct HeartbeatContext {
466    identity: std::sync::Arc<identity::Identity>,
467    runtime: std::sync::Arc<gossip::GossipRuntime>,
468    network: std::sync::Arc<network::NetworkNode>,
469    interval_secs: u64,
470    cache: std::sync::Arc<
471        tokio::sync::RwLock<std::collections::HashMap<identity::AgentId, DiscoveredAgent>>,
472    >,
473}
474
475impl HeartbeatContext {
476    async fn announce(&self) -> error::Result<()> {
477        let machine_public_key = self
478            .identity
479            .machine_keypair()
480            .public_key()
481            .as_bytes()
482            .to_vec();
483        let announced_at = Agent::unix_timestamp_secs();
484
485        // Include ALL routable addresses (IPv4 and IPv6) so other agents
486        // can connect to us via whichever protocol they support.
487        let mut addresses = match self.network.node_status().await {
488            Some(status) if !status.external_addrs.is_empty() => status.external_addrs,
489            _ => match self.network.routable_addr().await {
490                Some(addr) => vec![addr],
491                None => Vec::new(),
492            },
493        };
494
495        // Detect global IPv6 address locally (ant-quic currently only
496        // reports IPv4 via OBSERVED_ADDRESS). Uses UDP connect trick —
497        // no data is sent, the OS routing table resolves our source addr.
498        let port = addresses.first().map(|a| a.port()).unwrap_or(5483);
499        if let Ok(sock) = std::net::UdpSocket::bind("[::]:0") {
500            if sock.connect("[2001:4860:4860::8888]:80").is_ok() {
501                if let Ok(local) = sock.local_addr() {
502                    if let std::net::IpAddr::V6(v6) = local.ip() {
503                        let segs = v6.segments();
504                        let is_global = (segs[0] & 0xffc0) != 0xfe80
505                            && (segs[0] & 0xff00) != 0xfd00
506                            && !v6.is_loopback();
507                        if is_global {
508                            let v6_addr = std::net::SocketAddr::new(std::net::IpAddr::V6(v6), port);
509                            if !addresses.contains(&v6_addr) {
510                                addresses.push(v6_addr);
511                            }
512                        }
513                    }
514                }
515            }
516        }
517
518        // Query NAT and relay status from the network layer.
519        let (nat_type, can_receive_direct, is_relay, is_coordinator) =
520            match self.network.node_status().await {
521                Some(status) => (
522                    Some(status.nat_type.to_string()),
523                    Some(status.can_receive_direct),
524                    Some(status.is_relaying),
525                    Some(status.is_coordinating),
526                ),
527                None => (None, None, None, None),
528            };
529
530        let unsigned = IdentityAnnouncementUnsigned {
531            agent_id: self.identity.agent_id(),
532            machine_id: self.identity.machine_id(),
533            user_id: self
534                .identity
535                .user_keypair()
536                .map(identity::UserKeypair::user_id),
537            agent_certificate: self.identity.agent_certificate().cloned(),
538            machine_public_key: machine_public_key.clone(),
539            addresses,
540            announced_at,
541            nat_type: nat_type.clone(),
542            can_receive_direct,
543            is_relay,
544            is_coordinator,
545        };
546        let unsigned_bytes = bincode::serialize(&unsigned).map_err(|e| {
547            error::IdentityError::Serialization(format!(
548                "heartbeat: failed to serialize announcement: {e}"
549            ))
550        })?;
551        let machine_signature = ant_quic::crypto::raw_public_keys::pqc::sign_with_ml_dsa(
552            self.identity.machine_keypair().secret_key(),
553            &unsigned_bytes,
554        )
555        .map_err(|e| {
556            error::IdentityError::Storage(std::io::Error::other(format!(
557                "heartbeat: failed to sign announcement: {:?}",
558                e
559            )))
560        })?
561        .as_bytes()
562        .to_vec();
563
564        let announcement = IdentityAnnouncement {
565            agent_id: unsigned.agent_id,
566            machine_id: unsigned.machine_id,
567            user_id: unsigned.user_id,
568            agent_certificate: unsigned.agent_certificate,
569            machine_public_key: machine_public_key.clone(),
570            machine_signature,
571            addresses: unsigned.addresses,
572            announced_at,
573            nat_type,
574            can_receive_direct,
575            is_relay,
576            is_coordinator,
577        };
578        let encoded = bincode::serialize(&announcement).map_err(|e| {
579            error::IdentityError::Serialization(format!(
580                "heartbeat: failed to serialize announcement: {e}"
581            ))
582        })?;
583        self.runtime
584            .pubsub()
585            .publish(
586                IDENTITY_ANNOUNCE_TOPIC.to_string(),
587                bytes::Bytes::from(encoded),
588            )
589            .await
590            .map_err(|e| {
591                error::IdentityError::Storage(std::io::Error::other(format!(
592                    "heartbeat: publish failed: {e}"
593                )))
594            })?;
595        let now = Agent::unix_timestamp_secs();
596        self.cache.write().await.insert(
597            announcement.agent_id,
598            DiscoveredAgent {
599                agent_id: announcement.agent_id,
600                machine_id: announcement.machine_id,
601                user_id: announcement.user_id,
602                addresses: announcement.addresses,
603                announced_at: announcement.announced_at,
604                last_seen: now,
605                machine_public_key: machine_public_key.clone(),
606                nat_type: announcement.nat_type.clone(),
607                can_receive_direct: announcement.can_receive_direct,
608                is_relay: announcement.is_relay,
609                is_coordinator: announcement.is_coordinator,
610            },
611        );
612        Ok(())
613    }
614}
615
616impl Agent {
617    /// Create a new agent with default configuration.
618    ///
619    /// This generates a fresh identity with both machine and agent keypairs.
620    /// The machine keypair is stored persistently in `~/.x0x/machine.key`.
621    ///
622    /// For more control, use [`Agent::builder()`].
623    pub async fn new() -> error::Result<Self> {
624        Agent::builder().build().await
625    }
626
627    /// Create an [`AgentBuilder`] for fine-grained configuration.
628    ///
629    /// The builder supports:
630    /// - Custom machine key path via `with_machine_key()`
631    /// - Imported agent keypair via `with_agent_key()`
632    /// - User identity via `with_user_key()` or `with_user_key_path()`
633    pub fn builder() -> AgentBuilder {
634        AgentBuilder {
635            machine_key_path: None,
636            agent_keypair: None,
637            agent_key_path: None,
638            user_keypair: None,
639            user_key_path: None,
640            network_config: None,
641            peer_cache_dir: None,
642            heartbeat_interval_secs: None,
643            identity_ttl_secs: None,
644            contact_store_path: None,
645        }
646    }
647
648    /// Get the agent's identity.
649    ///
650    /// # Returns
651    ///
652    /// A reference to the agent's [`identity::Identity`].
653    #[inline]
654    #[must_use]
655    pub fn identity(&self) -> &identity::Identity {
656        &self.identity
657    }
658
659    /// Get the machine ID for this agent.
660    ///
661    /// The machine ID is tied to this computer and used for QUIC transport
662    /// authentication. It is stored persistently in `~/.x0x/machine.key`.
663    ///
664    /// # Returns
665    ///
666    /// The agent's machine ID.
667    #[inline]
668    #[must_use]
669    pub fn machine_id(&self) -> identity::MachineId {
670        self.identity.machine_id()
671    }
672
673    /// Get the agent ID for this agent.
674    ///
675    /// The agent ID is portable across machines and represents the agent's
676    /// persistent identity. It can be exported and imported to run the same
677    /// agent on different computers.
678    ///
679    /// # Returns
680    ///
681    /// The agent's ID.
682    #[inline]
683    #[must_use]
684    pub fn agent_id(&self) -> identity::AgentId {
685        self.identity.agent_id()
686    }
687
688    /// Get the user ID for this agent, if a user identity is bound.
689    ///
690    /// Returns `None` if no user keypair was provided during construction.
691    /// User keys are opt-in — they are never auto-generated.
692    #[inline]
693    #[must_use]
694    pub fn user_id(&self) -> Option<identity::UserId> {
695        self.identity.user_id()
696    }
697
698    /// Get the agent certificate, if one exists.
699    ///
700    /// The certificate cryptographically binds this agent to a user identity.
701    #[inline]
702    #[must_use]
703    pub fn agent_certificate(&self) -> Option<&identity::AgentCertificate> {
704        self.identity.agent_certificate()
705    }
706
707    /// Get the network node, if initialized.
708    #[must_use]
709    pub fn network(&self) -> Option<&std::sync::Arc<network::NetworkNode>> {
710        self.network.as_ref()
711    }
712
713    /// Get a reference to the contact store.
714    ///
715    /// The contact store persists trust levels and machine records for known
716    /// agents. It is backed by `~/.x0x/contacts.json` by default.
717    ///
718    /// Use [`with_contact_store_path`](AgentBuilder::with_contact_store_path)
719    /// on the builder to customise the path.
720    #[must_use]
721    pub fn contacts(&self) -> &std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>> {
722        &self.contact_store
723    }
724
725    /// Get the reachability information for a discovered agent.
726    ///
727    /// Returns `None` if the agent is not in the discovery cache.
728    /// Use [`Agent::announce_identity`] or wait for a heartbeat announcement
729    /// to populate the cache.
730    pub async fn reachability(
731        &self,
732        agent_id: &identity::AgentId,
733    ) -> Option<connectivity::ReachabilityInfo> {
734        let cache = self.identity_discovery_cache.read().await;
735        cache
736            .get(agent_id)
737            .map(connectivity::ReachabilityInfo::from_discovered)
738    }
739
740    /// Attempt to connect to an agent by its identity.
741    ///
742    /// Looks up the agent in the discovery cache, then tries to establish
743    /// a QUIC connection using the best available strategy:
744    ///
745    /// 1. **Direct** — if the agent reports `can_receive_direct: true` or
746    ///    has a traversable NAT type, try each known address in order.
747    /// 2. **Coordinated** — if direct fails or the agent reports a symmetric
748    ///    NAT, the outcome is `Coordinated` if any address was reachable via
749    ///    the network layer's NAT traversal.
750    /// 3. **Unreachable** — no address succeeded.
751    /// 4. **NotFound** — the agent is not in the discovery cache.
752    ///
753    /// # Errors
754    ///
755    /// Returns an error only for internal failures (e.g. network not started).
756    /// Connectivity failures are reported as `ConnectOutcome::Unreachable`.
757    pub async fn connect_to_agent(
758        &self,
759        agent_id: &identity::AgentId,
760    ) -> error::Result<connectivity::ConnectOutcome> {
761        // 1. Look up in discovery cache
762        let discovered = {
763            let cache = self.identity_discovery_cache.read().await;
764            cache.get(agent_id).cloned()
765        };
766
767        let agent = match discovered {
768            Some(a) => a,
769            None => return Ok(connectivity::ConnectOutcome::NotFound),
770        };
771
772        let info = connectivity::ReachabilityInfo::from_discovered(&agent);
773
774        if info.addresses.is_empty() {
775            return Ok(connectivity::ConnectOutcome::Unreachable);
776        }
777
778        let Some(ref network) = self.network else {
779            return Ok(connectivity::ConnectOutcome::Unreachable);
780        };
781
782        // 2. Try direct connection if likely to succeed
783        if info.likely_direct() {
784            for addr in &info.addresses {
785                match network.connect_addr(*addr).await {
786                    Ok(_peer_id) => {
787                        // Enrich bootstrap cache with this successful address
788                        if let Some(ref bc) = self.bootstrap_cache {
789                            let peer_id = ant_quic::PeerId(agent.machine_id.0);
790                            bc.add_from_connection(peer_id, vec![*addr], None).await;
791                        }
792                        // Register agent mapping for direct messaging
793                        self.direct_messaging
794                            .mark_connected(agent.agent_id, agent.machine_id)
795                            .await;
796                        return Ok(connectivity::ConnectOutcome::Direct(*addr));
797                    }
798                    Err(e) => {
799                        tracing::debug!("Direct connect to {} failed: {}", addr, e);
800                    }
801                }
802            }
803        }
804
805        // 3. If direct failed and coordination may help, try remaining addresses
806        //    The network layer handles NAT traversal internally via QUIC extension frames.
807        if info.needs_coordination() || !info.likely_direct() {
808            for addr in &info.addresses {
809                match network.connect_addr(*addr).await {
810                    Ok(_peer_id) => {
811                        if let Some(ref bc) = self.bootstrap_cache {
812                            let peer_id = ant_quic::PeerId(agent.machine_id.0);
813                            bc.add_from_connection(peer_id, vec![*addr], None).await;
814                        }
815                        // Register agent mapping for direct messaging
816                        self.direct_messaging
817                            .mark_connected(agent.agent_id, agent.machine_id)
818                            .await;
819                        return Ok(connectivity::ConnectOutcome::Coordinated(*addr));
820                    }
821                    Err(e) => {
822                        tracing::debug!("Coordinated connect to {} failed: {}", addr, e);
823                    }
824                }
825            }
826        }
827
828        Ok(connectivity::ConnectOutcome::Unreachable)
829    }
830
831    /// Save the bootstrap cache and release resources.
832    ///
833    /// Call this before dropping the agent to ensure the peer cache is
834    /// persisted to disk. The background maintenance task saves periodically,
835    /// but this guarantees a final save.
836    pub async fn shutdown(&self) {
837        if let Some(ref cache) = self.bootstrap_cache {
838            if let Err(e) = cache.save().await {
839                tracing::warn!("Failed to save bootstrap cache on shutdown: {e}");
840            } else {
841                tracing::info!("Bootstrap cache saved on shutdown");
842            }
843        }
844    }
845
846    // === Direct Messaging ===
847
848    /// Send data directly to a connected agent.
849    ///
850    /// This bypasses gossip pub/sub for efficient point-to-point communication.
851    /// The agent must be connected first via [`Self::connect_to_agent`].
852    ///
853    /// # Arguments
854    ///
855    /// * `agent_id` - The target agent's identifier.
856    /// * `payload` - The data to send.
857    ///
858    /// # Errors
859    ///
860    /// Returns an error if:
861    /// - Network is not initialized
862    /// - Agent is not connected
863    /// - Agent is not found in discovery cache
864    /// - Send fails
865    ///
866    /// # Example
867    ///
868    /// ```rust,ignore
869    /// // First connect to the agent
870    /// let outcome = agent.connect_to_agent(&target_agent_id).await?;
871    ///
872    /// // Then send data directly
873    /// agent.send_direct(&target_agent_id, b"hello".to_vec()).await?;
874    /// ```
875    pub async fn send_direct(
876        &self,
877        agent_id: &identity::AgentId,
878        payload: Vec<u8>,
879    ) -> error::NetworkResult<()> {
880        let network = self.network.as_ref().ok_or_else(|| {
881            error::NetworkError::NodeCreation("network not initialized".to_string())
882        })?;
883
884        // Look up machine_id from discovery cache
885        let machine_id = {
886            let cache = self.identity_discovery_cache.read().await;
887            cache.get(agent_id).map(|d| d.machine_id)
888        }
889        .ok_or(error::NetworkError::AgentNotFound(agent_id.0))?;
890
891        // Check if connected
892        let ant_peer_id = ant_quic::PeerId(machine_id.0);
893        if !network.is_connected(&ant_peer_id).await {
894            return Err(error::NetworkError::AgentNotConnected(agent_id.0));
895        }
896
897        // Send via network layer
898        network
899            .send_direct(&ant_peer_id, &self.identity.agent_id().0, &payload)
900            .await?;
901
902        tracing::info!(
903            "Sent {} bytes directly to agent {:?}",
904            payload.len(),
905            agent_id
906        );
907
908        Ok(())
909    }
910
911    /// Receive the next direct message from any connected agent.
912    ///
913    /// Blocks until a direct message is received.
914    ///
915    /// # Security Note
916    ///
917    /// This method does **not** apply trust filtering from `ContactStore`.
918    /// Messages from blocked agents will still be delivered. Use
919    /// [`recv_direct_filtered()`](Self::recv_direct_filtered) if you need
920    /// trust-based filtering.
921    ///
922    /// # Returns
923    ///
924    /// The received [`DirectMessage`] containing sender, payload, and timestamp.
925    ///
926    /// # Example
927    ///
928    /// ```rust,ignore
929    /// loop {
930    ///     if let Some(msg) = agent.recv_direct().await {
931    ///         println!("From {:?}: {:?}", msg.sender, msg.payload_str());
932    ///     }
933    /// }
934    /// ```
935    pub async fn recv_direct(&self) -> Option<direct::DirectMessage> {
936        self.recv_direct_inner().await
937    }
938
939    /// Receive the next direct message, filtering by trust level.
940    ///
941    /// Messages from blocked agents are silently dropped. This mirrors the
942    /// behavior of gossip pub/sub message filtering.
943    ///
944    /// # Returns
945    ///
946    /// The received [`DirectMessage`], or `None` if the channel closes.
947    /// Messages from blocked senders are dropped and the method continues
948    /// waiting for the next acceptable message.
949    ///
950    /// # Example
951    ///
952    /// ```rust,ignore
953    /// // Block an agent
954    /// {
955    ///     let mut contacts = agent.contacts().write().await;
956    ///     contacts.set_trust(&bad_agent_id, TrustLevel::Blocked);
957    /// }
958    ///
959    /// // Messages from blocked agents are silently dropped
960    /// loop {
961    ///     if let Some(msg) = agent.recv_direct_filtered().await {
962    ///         // msg.sender is not in the blocked list
963    ///         // (note: sender is self-asserted, see DirectMessage docs)
964    ///     }
965    /// }
966    /// ```
967    pub async fn recv_direct_filtered(&self) -> Option<direct::DirectMessage> {
968        loop {
969            let msg = self.recv_direct_inner().await?;
970
971            // Check trust level
972            let contacts = self.contact_store.read().await;
973            if let Some(contact) = contacts.get(&msg.sender) {
974                if contact.trust_level == contacts::TrustLevel::Blocked {
975                    tracing::debug!(
976                        "Dropping direct message from blocked agent {:?}",
977                        msg.sender
978                    );
979                    continue;
980                }
981            }
982
983            return Some(msg);
984        }
985    }
986
987    /// Internal helper for receiving direct messages.
988    async fn recv_direct_inner(&self) -> Option<direct::DirectMessage> {
989        let network = self.network.as_ref()?;
990
991        // Get the raw message from network layer
992        let (ant_peer_id, payload) = network.recv_direct().await?;
993
994        // Parse sender agent_id from payload (first 32 bytes after stream type)
995        if payload.len() < 32 {
996            tracing::warn!("Direct message too short to contain sender agent_id");
997            return None;
998        }
999
1000        let mut sender_bytes = [0u8; 32];
1001        sender_bytes.copy_from_slice(&payload[..32]);
1002        let sender = identity::AgentId(sender_bytes);
1003        let machine_id = identity::MachineId(ant_peer_id.0);
1004        let data = payload[32..].to_vec();
1005
1006        // Register the mapping for future lookups
1007        self.direct_messaging
1008            .register_agent(sender, machine_id)
1009            .await;
1010
1011        Some(direct::DirectMessage::new(sender, machine_id, data))
1012    }
1013
1014    /// Subscribe to direct messages.
1015    ///
1016    /// Returns a receiver that can be cloned for multiple consumers.
1017    /// Messages are broadcast to all receivers.
1018    ///
1019    /// # Example
1020    ///
1021    /// ```rust,ignore
1022    /// let mut rx = agent.subscribe_direct();
1023    /// tokio::spawn(async move {
1024    ///     while let Some(msg) = rx.recv().await {
1025    ///         println!("Direct message: {:?}", msg);
1026    ///     }
1027    /// });
1028    /// ```
1029    pub fn subscribe_direct(&self) -> direct::DirectMessageReceiver {
1030        self.direct_messaging.subscribe()
1031    }
1032
1033    /// Get the direct messaging infrastructure.
1034    ///
1035    /// Provides low-level access to connection tracking and agent mappings.
1036    pub fn direct_messaging(&self) -> &std::sync::Arc<direct::DirectMessaging> {
1037        &self.direct_messaging
1038    }
1039
1040    /// Check if an agent is currently connected for direct messaging.
1041    ///
1042    /// # Arguments
1043    ///
1044    /// * `agent_id` - The agent to check.
1045    ///
1046    /// # Returns
1047    ///
1048    /// `true` if a QUIC connection exists to this agent's machine.
1049    pub async fn is_agent_connected(&self, agent_id: &identity::AgentId) -> bool {
1050        let Some(network) = &self.network else {
1051            return false;
1052        };
1053
1054        // Look up machine_id from discovery cache
1055        let machine_id = {
1056            let cache = self.identity_discovery_cache.read().await;
1057            cache.get(agent_id).map(|d| d.machine_id)
1058        };
1059
1060        match machine_id {
1061            Some(mid) => {
1062                let ant_peer_id = ant_quic::PeerId(mid.0);
1063                network.is_connected(&ant_peer_id).await
1064            }
1065            None => false,
1066        }
1067    }
1068
1069    /// Get list of currently connected agents.
1070    ///
1071    /// Returns agents that have been discovered and are currently connected
1072    /// via QUIC transport.
1073    pub async fn connected_agents(&self) -> Vec<identity::AgentId> {
1074        let Some(network) = &self.network else {
1075            return Vec::new();
1076        };
1077
1078        let connected_peers = network.connected_peers().await;
1079        let cache = self.identity_discovery_cache.read().await;
1080
1081        // Find agents whose machine_id matches a connected peer
1082        cache
1083            .values()
1084            .filter(|agent| {
1085                let ant_peer_id = ant_quic::PeerId(agent.machine_id.0);
1086                connected_peers.contains(&ant_peer_id)
1087            })
1088            .map(|agent| agent.agent_id)
1089            .collect()
1090    }
1091
1092    /// Attach a contact store for trust-based message filtering.
1093    ///
1094    /// When set, the gossip pub/sub layer will:
1095    /// - Drop messages from `Blocked` senders (don't deliver, don't rebroadcast)
1096    /// - Annotate messages with the sender's trust level for consumers
1097    ///
1098    /// Without a contact store, all messages pass through (open relay mode).
1099    pub fn set_contacts(&self, store: std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>>) {
1100        if let Some(runtime) = &self.gossip_runtime {
1101            runtime.pubsub().set_contacts(store);
1102        }
1103    }
1104
1105    /// Announce this agent's identity on the network discovery topic.
1106    ///
1107    /// By default, announcements include agent + machine identity only.
1108    /// Human identity disclosure is opt-in and requires explicit consent.
1109    ///
1110    /// # Arguments
1111    ///
1112    /// * `include_user_identity` - Whether to include `user_id` and certificate
1113    /// * `human_consent` - Must be `true` when disclosing user identity
1114    ///
1115    /// # Errors
1116    ///
1117    /// Returns an error if:
1118    /// - Gossip runtime is not initialized
1119    /// - Human identity disclosure is requested without explicit consent
1120    /// - Human identity disclosure is requested but no user identity is configured
1121    /// - Serialization or publish fails
1122    pub async fn announce_identity(
1123        &self,
1124        include_user_identity: bool,
1125        human_consent: bool,
1126    ) -> error::Result<()> {
1127        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1128            error::IdentityError::Storage(std::io::Error::other(
1129                "gossip runtime not initialized - configure agent with network first",
1130            ))
1131        })?;
1132
1133        self.start_identity_listener().await?;
1134
1135        // Include ALL routable addresses (IPv4 and IPv6).
1136        let mut addresses = if let Some(network) = self.network.as_ref() {
1137            match network.node_status().await {
1138                Some(status) if !status.external_addrs.is_empty() => status.external_addrs,
1139                _ => match network.routable_addr().await {
1140                    Some(addr) => vec![addr],
1141                    None => self.announcement_addresses(),
1142                },
1143            }
1144        } else {
1145            self.announcement_addresses()
1146        };
1147        // Detect addresses locally via UDP socket tricks.
1148        // ant-quic discovers public IPv4 via OBSERVED_ADDRESS from peers.
1149        // IPv6 is globally routable (no NAT), so we probe locally.
1150        let port = addresses.first().map(|a| a.port()).unwrap_or(5483);
1151
1152        // IPv6 probe
1153        if let Ok(sock) = std::net::UdpSocket::bind("[::]:0") {
1154            if sock.connect("[2001:4860:4860::8888]:80").is_ok() {
1155                if let Ok(local) = sock.local_addr() {
1156                    if let std::net::IpAddr::V6(v6) = local.ip() {
1157                        let segs = v6.segments();
1158                        let is_global = (segs[0] & 0xffc0) != 0xfe80
1159                            && (segs[0] & 0xff00) != 0xfd00
1160                            && !v6.is_loopback();
1161                        if is_global {
1162                            let v6_addr = std::net::SocketAddr::new(std::net::IpAddr::V6(v6), port);
1163                            if !addresses.contains(&v6_addr) {
1164                                addresses.push(v6_addr);
1165                            }
1166                        }
1167                    }
1168                }
1169            }
1170        }
1171        let announcement = self.build_identity_announcement_with_addrs(
1172            include_user_identity,
1173            human_consent,
1174            addresses,
1175        )?;
1176
1177        let encoded = bincode::serialize(&announcement).map_err(|e| {
1178            error::IdentityError::Serialization(format!(
1179                "failed to serialize identity announcement: {e}"
1180            ))
1181        })?;
1182
1183        let payload = bytes::Bytes::from(encoded);
1184
1185        // Publish to shard topic first (future-proof routing).
1186        let shard_topic = shard_topic_for_agent(&announcement.agent_id);
1187        runtime
1188            .pubsub()
1189            .publish(shard_topic, payload.clone())
1190            .await
1191            .map_err(|e| {
1192                error::IdentityError::Storage(std::io::Error::other(format!(
1193                    "failed to publish identity announcement to shard topic: {e}"
1194                )))
1195            })?;
1196
1197        // Also publish to legacy broadcast topic for backward compatibility.
1198        runtime
1199            .pubsub()
1200            .publish(IDENTITY_ANNOUNCE_TOPIC.to_string(), payload)
1201            .await
1202            .map_err(|e| {
1203                error::IdentityError::Storage(std::io::Error::other(format!(
1204                    "failed to publish identity announcement: {e}"
1205                )))
1206            })?;
1207
1208        let now = Self::unix_timestamp_secs();
1209        self.identity_discovery_cache.write().await.insert(
1210            announcement.agent_id,
1211            DiscoveredAgent {
1212                agent_id: announcement.agent_id,
1213                machine_id: announcement.machine_id,
1214                user_id: announcement.user_id,
1215                addresses: announcement.addresses.clone(),
1216                announced_at: announcement.announced_at,
1217                last_seen: now,
1218                machine_public_key: announcement.machine_public_key.clone(),
1219                nat_type: announcement.nat_type.clone(),
1220                can_receive_direct: announcement.can_receive_direct,
1221                is_relay: announcement.is_relay,
1222                is_coordinator: announcement.is_coordinator,
1223            },
1224        );
1225
1226        Ok(())
1227    }
1228
1229    /// Get all discovered agents from identity announcements.
1230    ///
1231    /// # Errors
1232    ///
1233    /// Returns an error if the gossip runtime is not initialized.
1234    pub async fn discovered_agents(&self) -> error::Result<Vec<DiscoveredAgent>> {
1235        self.start_identity_listener().await?;
1236        let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
1237        let mut agents: Vec<_> = self
1238            .identity_discovery_cache
1239            .read()
1240            .await
1241            .values()
1242            .filter(|a| a.announced_at >= cutoff)
1243            .cloned()
1244            .collect();
1245        agents.sort_by(|a, b| a.agent_id.0.cmp(&b.agent_id.0));
1246        Ok(agents)
1247    }
1248
1249    /// Return all discovered agents regardless of TTL.
1250    ///
1251    /// Unlike [`Self::discovered_agents`], this method skips TTL filtering and
1252    /// returns all cache entries, including stale ones. Useful for debugging.
1253    ///
1254    /// # Errors
1255    ///
1256    /// Returns an error if the gossip runtime is not initialized.
1257    pub async fn discovered_agents_unfiltered(&self) -> error::Result<Vec<DiscoveredAgent>> {
1258        self.start_identity_listener().await?;
1259        let mut agents: Vec<_> = self
1260            .identity_discovery_cache
1261            .read()
1262            .await
1263            .values()
1264            .cloned()
1265            .collect();
1266        agents.sort_by(|a, b| a.agent_id.0.cmp(&b.agent_id.0));
1267        Ok(agents)
1268    }
1269
1270    /// Get one discovered agent record by agent ID.
1271    ///
1272    /// # Errors
1273    ///
1274    /// Returns an error if the gossip runtime is not initialized.
1275    pub async fn discovered_agent(
1276        &self,
1277        agent_id: identity::AgentId,
1278    ) -> error::Result<Option<DiscoveredAgent>> {
1279        self.start_identity_listener().await?;
1280        Ok(self
1281            .identity_discovery_cache
1282            .read()
1283            .await
1284            .get(&agent_id)
1285            .cloned())
1286    }
1287
1288    async fn start_identity_listener(&self) -> error::Result<()> {
1289        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1290            error::IdentityError::Storage(std::io::Error::other(
1291                "gossip runtime not initialized - configure agent with network first",
1292            ))
1293        })?;
1294
1295        if self
1296            .identity_listener_started
1297            .swap(true, std::sync::atomic::Ordering::AcqRel)
1298        {
1299            return Ok(());
1300        }
1301
1302        let mut sub_legacy = runtime
1303            .pubsub()
1304            .subscribe(IDENTITY_ANNOUNCE_TOPIC.to_string())
1305            .await;
1306        let own_shard_topic = shard_topic_for_agent(&self.agent_id());
1307        let mut sub_shard = runtime.pubsub().subscribe(own_shard_topic).await;
1308        let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
1309        let bootstrap_cache = self.bootstrap_cache.clone();
1310        let contact_store = std::sync::Arc::clone(&self.contact_store);
1311        let network = self.network.as_ref().map(std::sync::Arc::clone);
1312        let own_agent_id = self.agent_id();
1313
1314        tokio::spawn(async move {
1315            // Track agents we've already initiated auto-connect to, preventing
1316            // duplicate connection attempts from concurrent announcements.
1317            let mut auto_connect_attempted = std::collections::HashSet::<identity::AgentId>::new();
1318
1319            loop {
1320                // Drain whichever subscription fires next; deduplicate by AgentId in cache.
1321                let msg = tokio::select! {
1322                    Some(m) = sub_legacy.recv() => m,
1323                    Some(m) = sub_shard.recv() => m,
1324                    else => break,
1325                };
1326                let decoded = {
1327                    use bincode::Options;
1328                    bincode::options()
1329                        .with_fixint_encoding()
1330                        .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
1331                        .allow_trailing_bytes()
1332                        .deserialize::<IdentityAnnouncement>(&msg.payload)
1333                };
1334                let announcement = match decoded {
1335                    Ok(a) => a,
1336                    Err(e) => {
1337                        tracing::debug!("Ignoring invalid identity announcement payload: {}", e);
1338                        continue;
1339                    }
1340                };
1341
1342                if let Err(e) = announcement.verify() {
1343                    tracing::warn!("Ignoring unverifiable identity announcement: {}", e);
1344                    continue;
1345                }
1346
1347                // Evaluate trust for this (agent, machine) pair.
1348                // Blocked or machine-pinning violations are silently dropped.
1349                {
1350                    let store = contact_store.read().await;
1351                    let evaluator = trust::TrustEvaluator::new(&store);
1352                    let decision = evaluator.evaluate(&trust::TrustContext {
1353                        agent_id: &announcement.agent_id,
1354                        machine_id: &announcement.machine_id,
1355                    });
1356                    match decision {
1357                        trust::TrustDecision::RejectBlocked => {
1358                            tracing::debug!(
1359                                "Dropping identity announcement from blocked agent {:?}",
1360                                hex::encode(&announcement.agent_id.0[..8]),
1361                            );
1362                            continue;
1363                        }
1364                        trust::TrustDecision::RejectMachineMismatch => {
1365                            tracing::warn!(
1366                                "Dropping identity announcement from agent {:?}: machine {:?} not in pinned list",
1367                                hex::encode(&announcement.agent_id.0[..8]),
1368                                hex::encode(&announcement.machine_id.0[..8]),
1369                            );
1370                            continue;
1371                        }
1372                        _ => {}
1373                    }
1374                }
1375
1376                // Update machine records in the contact store.
1377                {
1378                    let mut store = contact_store.write().await;
1379                    let record = contacts::MachineRecord::new(announcement.machine_id, None);
1380                    store.add_machine(&announcement.agent_id, record);
1381                }
1382
1383                let now = std::time::SystemTime::now()
1384                    .duration_since(std::time::UNIX_EPOCH)
1385                    .map_or(0, |d| d.as_secs());
1386
1387                // Add announced addresses to the bootstrap cache so auto-dial
1388                // can connect to peers discovered via gossip announcements.
1389                // After identity unification, machine_id == ant-quic PeerId.
1390                if !announcement.addresses.is_empty() {
1391                    if let Some(ref bc) = &bootstrap_cache {
1392                        let peer_id = ant_quic::PeerId(announcement.machine_id.0);
1393                        bc.add_from_connection(peer_id, announcement.addresses.clone(), None)
1394                            .await;
1395                        tracing::debug!(
1396                            "Added {} addresses from identity announcement to bootstrap cache for agent {:?} (machine {:?})",
1397                            announcement.addresses.len(),
1398                            announcement.agent_id,
1399                            hex::encode(&announcement.machine_id.0[..8]),
1400                        );
1401                    }
1402                }
1403
1404                cache.write().await.insert(
1405                    announcement.agent_id,
1406                    DiscoveredAgent {
1407                        agent_id: announcement.agent_id,
1408                        machine_id: announcement.machine_id,
1409                        user_id: announcement.user_id,
1410                        addresses: announcement.addresses.clone(),
1411                        announced_at: announcement.announced_at,
1412                        last_seen: now,
1413                        machine_public_key: announcement.machine_public_key.clone(),
1414                        nat_type: announcement.nat_type.clone(),
1415                        can_receive_direct: announcement.can_receive_direct,
1416                        is_relay: announcement.is_relay,
1417                        is_coordinator: announcement.is_coordinator,
1418                    },
1419                );
1420
1421                // Auto-connect to discovered agents so pub/sub messages can route
1422                // between peers that share bootstrap nodes but aren't directly connected.
1423                // The gossip topology refresh (every 1s) will add the new peer to
1424                // PlumTree topic trees once the QUIC connection is established.
1425                if announcement.agent_id != own_agent_id
1426                    && !announcement.addresses.is_empty()
1427                    && !auto_connect_attempted.contains(&announcement.agent_id)
1428                {
1429                    if let Some(ref net) = &network {
1430                        let ant_peer = ant_quic::PeerId(announcement.machine_id.0);
1431                        if !net.is_connected(&ant_peer).await {
1432                            auto_connect_attempted.insert(announcement.agent_id);
1433                            let net = std::sync::Arc::clone(net);
1434                            let addresses = announcement.addresses.clone();
1435                            tokio::spawn(async move {
1436                                for addr in &addresses {
1437                                    match net.connect_addr(*addr).await {
1438                                        Ok(_) => {
1439                                            tracing::info!(
1440                                                "Auto-connected to discovered agent at {addr}",
1441                                            );
1442                                            return;
1443                                        }
1444                                        Err(e) => {
1445                                            tracing::debug!("Auto-connect to {addr} failed: {e}",);
1446                                        }
1447                                    }
1448                                }
1449                                tracing::debug!(
1450                                    "Auto-connect exhausted all {} addresses for discovered agent",
1451                                    addresses.len(),
1452                                );
1453                            });
1454                        }
1455                    }
1456                }
1457            }
1458        });
1459
1460        Ok(())
1461    }
1462
1463    fn unix_timestamp_secs() -> u64 {
1464        std::time::SystemTime::now()
1465            .duration_since(std::time::UNIX_EPOCH)
1466            .map_or(0, |d| d.as_secs())
1467    }
1468
1469    fn announcement_addresses(&self) -> Vec<std::net::SocketAddr> {
1470        // Try routable_addr synchronously via local_addr fallback.
1471        // The async routable_addr is used in HeartbeatContext::announce().
1472        match self.network.as_ref().and_then(|n| n.local_addr()) {
1473            Some(addr) if addr.port() > 0 && !addr.ip().is_unspecified() => vec![addr],
1474            _ => Vec::new(),
1475        }
1476    }
1477
1478    fn build_identity_announcement(
1479        &self,
1480        include_user_identity: bool,
1481        human_consent: bool,
1482    ) -> error::Result<IdentityAnnouncement> {
1483        self.build_identity_announcement_with_addrs(
1484            include_user_identity,
1485            human_consent,
1486            self.announcement_addresses(),
1487        )
1488    }
1489
1490    fn build_identity_announcement_with_addrs(
1491        &self,
1492        include_user_identity: bool,
1493        human_consent: bool,
1494        addresses: Vec<std::net::SocketAddr>,
1495    ) -> error::Result<IdentityAnnouncement> {
1496        if include_user_identity && !human_consent {
1497            return Err(error::IdentityError::Storage(std::io::Error::other(
1498                "human identity disclosure requires explicit human consent — set human_consent: true in the request body",
1499            )));
1500        }
1501
1502        let (user_id, agent_certificate) = if include_user_identity {
1503            let user_id = self.user_id().ok_or_else(|| {
1504                error::IdentityError::Storage(std::io::Error::other(
1505                    "human identity disclosure requested but no user identity is configured — set user_key_path in your config.toml to point at your user keypair file",
1506                ))
1507            })?;
1508            let cert = self.agent_certificate().cloned().ok_or_else(|| {
1509                error::IdentityError::Storage(std::io::Error::other(
1510                    "human identity disclosure requested but agent certificate is missing",
1511                ))
1512            })?;
1513            (Some(user_id), Some(cert))
1514        } else {
1515            (None, None)
1516        };
1517
1518        let machine_public_key = self
1519            .identity
1520            .machine_keypair()
1521            .public_key()
1522            .as_bytes()
1523            .to_vec();
1524
1525        // NAT status is populated by the heartbeat loop (which is async and has
1526        // access to NodeStatus). Here we use None for the NAT fields as this
1527        // sync builder doesn't have async access to the network layer.
1528        let unsigned = IdentityAnnouncementUnsigned {
1529            agent_id: self.agent_id(),
1530            machine_id: self.machine_id(),
1531            user_id,
1532            agent_certificate: agent_certificate.clone(),
1533            machine_public_key: machine_public_key.clone(),
1534            addresses,
1535            announced_at: Self::unix_timestamp_secs(),
1536            nat_type: None,
1537            can_receive_direct: None,
1538            is_relay: None,
1539            is_coordinator: None,
1540        };
1541        let unsigned_bytes = bincode::serialize(&unsigned).map_err(|e| {
1542            error::IdentityError::Serialization(format!(
1543                "failed to serialize unsigned identity announcement: {e}"
1544            ))
1545        })?;
1546        let machine_signature = ant_quic::crypto::raw_public_keys::pqc::sign_with_ml_dsa(
1547            self.identity.machine_keypair().secret_key(),
1548            &unsigned_bytes,
1549        )
1550        .map_err(|e| {
1551            error::IdentityError::Storage(std::io::Error::other(format!(
1552                "failed to sign identity announcement with machine key: {:?}",
1553                e
1554            )))
1555        })?
1556        .as_bytes()
1557        .to_vec();
1558
1559        Ok(IdentityAnnouncement {
1560            agent_id: unsigned.agent_id,
1561            machine_id: unsigned.machine_id,
1562            user_id: unsigned.user_id,
1563            agent_certificate: unsigned.agent_certificate,
1564            machine_public_key,
1565            machine_signature,
1566            addresses: unsigned.addresses,
1567            announced_at: unsigned.announced_at,
1568            nat_type: unsigned.nat_type,
1569            can_receive_direct: unsigned.can_receive_direct,
1570            is_relay: unsigned.is_relay,
1571            is_coordinator: unsigned.is_coordinator,
1572        })
1573    }
1574
1575    /// Join the x0x gossip network.
1576    ///
1577    /// Connects to bootstrap peers in parallel with automatic retries.
1578    /// Failed connections are retried after a delay to allow stale
1579    /// connections on remote nodes to expire.
1580    ///
1581    /// If the agent was not configured with a network, this method
1582    /// succeeds gracefully (nothing to join).
1583    pub async fn join_network(&self) -> error::Result<()> {
1584        let Some(network) = self.network.as_ref() else {
1585            tracing::debug!("join_network called but no network configured");
1586            return Ok(());
1587        };
1588
1589        if let Some(ref runtime) = self.gossip_runtime {
1590            runtime.start().await.map_err(|e| {
1591                error::IdentityError::Storage(std::io::Error::other(format!(
1592                    "failed to start gossip runtime: {e}"
1593                )))
1594            })?;
1595            tracing::info!("Gossip runtime started");
1596        }
1597        self.start_identity_listener().await?;
1598
1599        let bootstrap_nodes = network.config().bootstrap_nodes.clone();
1600        if bootstrap_nodes.is_empty() {
1601            tracing::debug!("No bootstrap peers configured");
1602            if let Err(e) = self.announce_identity(false, false).await {
1603                tracing::warn!("Initial identity announcement failed: {}", e);
1604            }
1605            if let Err(e) = self.start_identity_heartbeat().await {
1606                tracing::warn!("Failed to start identity heartbeat: {e}");
1607            }
1608            return Ok(());
1609        }
1610
1611        let min_connected = 3;
1612        let mut all_connected: Vec<std::net::SocketAddr> = Vec::new();
1613
1614        // Phase 1: Try cached peers first using the real ant-quic peer IDs.
1615        if let Some(ref cache) = self.bootstrap_cache {
1616            const PHASE1_PEER_CANDIDATES: usize = 12;
1617            let cached_peers = cache.select_peers(PHASE1_PEER_CANDIDATES).await;
1618            if !cached_peers.is_empty() {
1619                tracing::info!("Phase 1: Trying {} cached peers", cached_peers.len());
1620                let (succeeded, _failed) = self
1621                    .connect_cached_peers_parallel_tracked(network, &cached_peers)
1622                    .await;
1623                all_connected.extend(&succeeded);
1624                tracing::info!(
1625                    "Phase 1: {}/{} cached peers connected",
1626                    succeeded.len(),
1627                    cached_peers.len()
1628                );
1629            }
1630        }
1631
1632        // Phase 2: Connect to hardcoded bootstrap nodes if we need more peers.
1633        if all_connected.len() < min_connected {
1634            let remaining: Vec<std::net::SocketAddr> = bootstrap_nodes
1635                .iter()
1636                .filter(|addr| !all_connected.contains(addr))
1637                .copied()
1638                .collect();
1639
1640            // Round 1: Connect to all bootstrap peers in parallel
1641            let (succeeded, mut failed) = self
1642                .connect_peers_parallel_tracked(network, &remaining)
1643                .await;
1644            all_connected.extend(&succeeded);
1645            tracing::info!(
1646                "Phase 2 round 1: {}/{} bootstrap peers connected",
1647                succeeded.len(),
1648                remaining.len()
1649            );
1650
1651            // Retry rounds for failed peers
1652            for round in 2..=3 {
1653                if failed.is_empty() {
1654                    break;
1655                }
1656                let delay = std::time::Duration::from_secs(if round == 2 { 10 } else { 15 });
1657                tracing::info!(
1658                    "Retrying {} failed peers in {}s (round {})",
1659                    failed.len(),
1660                    delay.as_secs(),
1661                    round
1662                );
1663                tokio::time::sleep(delay).await;
1664
1665                let (succeeded, still_failed) =
1666                    self.connect_peers_parallel_tracked(network, &failed).await;
1667                all_connected.extend(&succeeded);
1668                failed = still_failed;
1669                tracing::info!(
1670                    "Phase 2 round {}: {} total peers connected",
1671                    round,
1672                    all_connected.len()
1673                );
1674            }
1675
1676            if !failed.is_empty() {
1677                tracing::warn!(
1678                    "Could not connect to {} bootstrap peers: {:?}",
1679                    failed.len(),
1680                    failed
1681                );
1682            }
1683        }
1684
1685        tracing::info!(
1686            "Network join complete. Connected to {} peers.",
1687            all_connected.len()
1688        );
1689
1690        // Join the HyParView membership overlay via connected peers.
1691        if let Some(ref runtime) = self.gossip_runtime {
1692            let seeds: Vec<String> = all_connected.iter().map(|addr| addr.to_string()).collect();
1693            if !seeds.is_empty() {
1694                if let Err(e) = runtime.membership().join(seeds).await {
1695                    tracing::warn!("HyParView membership join failed: {e}");
1696                }
1697            }
1698        }
1699
1700        if let Err(e) = self.announce_identity(false, false).await {
1701            tracing::warn!("Initial identity announcement failed: {}", e);
1702        }
1703        if let Err(e) = self.start_identity_heartbeat().await {
1704            tracing::warn!("Failed to start identity heartbeat: {e}");
1705        }
1706
1707        Ok(())
1708    }
1709
1710    /// Connect to cached peers in parallel, returning (succeeded, failed) peer lists.
1711    async fn connect_cached_peers_parallel_tracked(
1712        &self,
1713        network: &std::sync::Arc<network::NetworkNode>,
1714        peers: &[ant_quic::CachedPeer],
1715    ) -> (Vec<std::net::SocketAddr>, Vec<ant_quic::PeerId>) {
1716        let handles: Vec<_> = peers
1717            .iter()
1718            .map(|peer| {
1719                let net = network.clone();
1720                let peer_id = peer.peer_id;
1721                tokio::spawn(async move {
1722                    tracing::debug!("Connecting to cached peer: {:?}", peer_id);
1723                    match net.connect_cached_peer(peer_id).await {
1724                        Ok(addr) => {
1725                            tracing::info!("Connected to cached peer {:?} at {}", peer_id, addr);
1726                            Ok(addr)
1727                        }
1728                        Err(e) => {
1729                            tracing::warn!("Failed to connect to cached peer {:?}: {}", peer_id, e);
1730                            Err(peer_id)
1731                        }
1732                    }
1733                })
1734            })
1735            .collect();
1736
1737        let mut succeeded = Vec::new();
1738        let mut failed = Vec::new();
1739        for handle in handles {
1740            match handle.await {
1741                Ok(Ok(addr)) => succeeded.push(addr),
1742                Ok(Err(peer_id)) => failed.push(peer_id),
1743                Err(e) => tracing::error!("Connection task panicked: {}", e),
1744            }
1745        }
1746        (succeeded, failed)
1747    }
1748
1749    /// Connect to multiple peers in parallel, returning (succeeded, failed) address lists.
1750    async fn connect_peers_parallel_tracked(
1751        &self,
1752        network: &std::sync::Arc<network::NetworkNode>,
1753        addrs: &[std::net::SocketAddr],
1754    ) -> (Vec<std::net::SocketAddr>, Vec<std::net::SocketAddr>) {
1755        let handles: Vec<_> = addrs
1756            .iter()
1757            .map(|addr| {
1758                let net = network.clone();
1759                let addr = *addr;
1760                tokio::spawn(async move {
1761                    tracing::debug!("Connecting to peer: {}", addr);
1762                    match net.connect_addr(addr).await {
1763                        Ok(_) => {
1764                            tracing::info!("Connected to peer: {}", addr);
1765                            Ok(addr)
1766                        }
1767                        Err(e) => {
1768                            tracing::warn!("Failed to connect to {}: {}", addr, e);
1769                            Err(addr)
1770                        }
1771                    }
1772                })
1773            })
1774            .collect();
1775
1776        let mut succeeded = Vec::new();
1777        let mut failed = Vec::new();
1778        for handle in handles {
1779            match handle.await {
1780                Ok(Ok(addr)) => succeeded.push(addr),
1781                Ok(Err(addr)) => failed.push(addr),
1782                Err(e) => tracing::error!("Connection task panicked: {}", e),
1783            }
1784        }
1785        (succeeded, failed)
1786    }
1787
1788    /// Subscribe to messages on a given topic.
1789    ///
1790    /// Returns a [`gossip::Subscription`] that yields messages as they arrive
1791    /// through the gossip network.
1792    ///
1793    /// # Errors
1794    ///
1795    /// Returns an error if:
1796    /// - Gossip runtime is not initialized (configure agent with network first)
1797    pub async fn subscribe(&self, topic: &str) -> error::Result<Subscription> {
1798        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1799            error::IdentityError::Storage(std::io::Error::other(
1800                "gossip runtime not initialized - configure agent with network first",
1801            ))
1802        })?;
1803        Ok(runtime.pubsub().subscribe(topic.to_string()).await)
1804    }
1805
1806    /// Publish a message to a topic.
1807    ///
1808    /// The message will propagate through the gossip network via
1809    /// epidemic broadcast — every agent that receives it will
1810    /// relay it to its neighbours.
1811    ///
1812    /// # Errors
1813    ///
1814    /// Returns an error if:
1815    /// - Gossip runtime is not initialized (configure agent with network first)
1816    /// - Message encoding or broadcast fails
1817    pub async fn publish(&self, topic: &str, payload: Vec<u8>) -> error::Result<()> {
1818        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1819            error::IdentityError::Storage(std::io::Error::other(
1820                "gossip runtime not initialized - configure agent with network first",
1821            ))
1822        })?;
1823        runtime
1824            .pubsub()
1825            .publish(topic.to_string(), bytes::Bytes::from(payload))
1826            .await
1827            .map_err(|e| {
1828                error::IdentityError::Storage(std::io::Error::other(format!(
1829                    "publish failed: {}",
1830                    e
1831                )))
1832            })
1833    }
1834
1835    /// Get connected peer IDs.
1836    ///
1837    /// Returns the list of peers currently connected via the gossip network.
1838    ///
1839    /// # Errors
1840    ///
1841    /// Returns an error if the network is not initialized.
1842    pub async fn peers(&self) -> error::Result<Vec<saorsa_gossip_types::PeerId>> {
1843        let network = self.network.as_ref().ok_or_else(|| {
1844            error::IdentityError::Storage(std::io::Error::other(
1845                "network not initialized - configure agent with network first",
1846            ))
1847        })?;
1848        let ant_peers = network.connected_peers().await;
1849        Ok(ant_peers
1850            .into_iter()
1851            .map(|p| saorsa_gossip_types::PeerId::new(p.0))
1852            .collect())
1853    }
1854
1855    /// Get online agents.
1856    ///
1857    /// Returns agent IDs discovered from signed identity announcements.
1858    ///
1859    /// # Errors
1860    ///
1861    /// Returns an error if the gossip runtime is not initialized.
1862    pub async fn presence(&self) -> error::Result<Vec<identity::AgentId>> {
1863        self.start_identity_listener().await?;
1864        let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
1865        let mut agents: Vec<_> = self
1866            .identity_discovery_cache
1867            .read()
1868            .await
1869            .values()
1870            .filter(|a| a.announced_at >= cutoff)
1871            .map(|a| a.agent_id)
1872            .collect();
1873        agents.sort_by(|a, b| a.0.cmp(&b.0));
1874        Ok(agents)
1875    }
1876
1877    /// Find an agent by ID, returning its known addresses.
1878    ///
1879    /// Performs a three-stage lookup:
1880    /// 1. **Cache hit** — return addresses immediately if the agent has already
1881    ///    been discovered.
1882    /// 2. **Shard subscription** — subscribe to the agent's identity shard topic
1883    ///    and wait up to 5 seconds for a heartbeat announcement.
1884    /// 3. **Rendezvous** — subscribe to the agent's rendezvous shard topic and
1885    ///    wait up to 5 seconds for a `ProviderSummary` advertisement.  This
1886    ///    works even when the two agents are on different gossip overlay clusters.
1887    ///
1888    /// Returns `None` if the agent is not found within the combined deadline.
1889    ///
1890    /// # Errors
1891    ///
1892    /// Returns an error if the gossip runtime is not initialized.
1893    pub async fn find_agent(
1894        &self,
1895        agent_id: identity::AgentId,
1896    ) -> error::Result<Option<Vec<std::net::SocketAddr>>> {
1897        self.start_identity_listener().await?;
1898
1899        // Stage 1: cache hit.
1900        if let Some(addrs) = self
1901            .identity_discovery_cache
1902            .read()
1903            .await
1904            .get(&agent_id)
1905            .map(|e| e.addresses.clone())
1906        {
1907            return Ok(Some(addrs));
1908        }
1909
1910        // Stage 2: subscribe to the agent's identity shard topic and wait up to 5 s.
1911        let runtime = match self.gossip_runtime.as_ref() {
1912            Some(r) => r,
1913            None => return Ok(None),
1914        };
1915        let shard_topic = shard_topic_for_agent(&agent_id);
1916        let mut sub = runtime.pubsub().subscribe(shard_topic).await;
1917        let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
1918        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
1919
1920        loop {
1921            if tokio::time::Instant::now() >= deadline {
1922                break;
1923            }
1924            let timeout = tokio::time::sleep_until(deadline);
1925            tokio::select! {
1926                Some(msg) = sub.recv() => {
1927                    if let Ok(ann) = {
1928                        use bincode::Options;
1929                        bincode::DefaultOptions::new()
1930                            .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
1931                            .deserialize::<IdentityAnnouncement>(&msg.payload)
1932                    } {
1933                        if ann.verify().is_ok() && ann.agent_id == agent_id {
1934                            let now = std::time::SystemTime::now()
1935                                .duration_since(std::time::UNIX_EPOCH)
1936                                .map_or(0, |d| d.as_secs());
1937                            let addrs = ann.addresses.clone();
1938                            cache.write().await.insert(
1939                                ann.agent_id,
1940                                DiscoveredAgent {
1941                                    agent_id: ann.agent_id,
1942                                    machine_id: ann.machine_id,
1943                                    user_id: ann.user_id,
1944                                    addresses: ann.addresses,
1945                                    announced_at: ann.announced_at,
1946                                    last_seen: now,
1947                                    machine_public_key: ann.machine_public_key.clone(),
1948                                    nat_type: ann.nat_type.clone(),
1949                                    can_receive_direct: ann.can_receive_direct,
1950                                    is_relay: ann.is_relay,
1951                                    is_coordinator: ann.is_coordinator,
1952                                },
1953                            );
1954                            return Ok(Some(addrs));
1955                        }
1956                    }
1957                }
1958                _ = timeout => break,
1959            }
1960        }
1961
1962        // Stage 3: rendezvous shard subscription — wait up to 5 s.
1963        if let Some(addrs) = self.find_agent_rendezvous(agent_id, 5).await? {
1964            return Ok(Some(addrs));
1965        }
1966
1967        Ok(None)
1968    }
1969
1970    /// Find all discovered agents claiming ownership by the given [`identity::UserId`].
1971    ///
1972    /// Only returns agents that announced with `include_user_identity: true`
1973    /// (i.e., agents whose [`DiscoveredAgent::user_id`] is `Some`).
1974    ///
1975    /// # Arguments
1976    ///
1977    /// * `user_id` - The user identity to search for
1978    ///
1979    /// # Errors
1980    ///
1981    /// Returns an error if the gossip runtime is not initialized.
1982    pub async fn find_agents_by_user(
1983        &self,
1984        user_id: identity::UserId,
1985    ) -> error::Result<Vec<DiscoveredAgent>> {
1986        self.start_identity_listener().await?;
1987        let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
1988        Ok(self
1989            .identity_discovery_cache
1990            .read()
1991            .await
1992            .values()
1993            .filter(|a| a.announced_at >= cutoff && a.user_id == Some(user_id))
1994            .cloned()
1995            .collect())
1996    }
1997
1998    /// Return the local socket address this agent's network node is bound to, if any.
1999    ///
2000    /// Returns `None` if no network has been configured or if the bind address is
2001    /// not yet known.
2002    #[must_use]
2003    pub fn local_addr(&self) -> Option<std::net::SocketAddr> {
2004        self.network.as_ref().and_then(|n| n.local_addr())
2005    }
2006
2007    /// Build a signed [`IdentityAnnouncement`] for this agent.
2008    ///
2009    /// Delegates to the internal `build_identity_announcement` method.
2010    ///
2011    /// # Errors
2012    ///
2013    /// Returns an error if key signing fails or human consent is required but not given.
2014    pub fn build_announcement(
2015        &self,
2016        include_user: bool,
2017        consent: bool,
2018    ) -> error::Result<IdentityAnnouncement> {
2019        self.build_identity_announcement(include_user, consent)
2020    }
2021
2022    /// Start the background identity heartbeat task.
2023    ///
2024    /// Idempotent — if the heartbeat is already running, returns `Ok(())` immediately.
2025    /// The heartbeat re-announces this agent's identity at `heartbeat_interval_secs`
2026    /// intervals so that late-joining peers can discover it without waiting for a
2027    /// new announcement.
2028    ///
2029    /// Called automatically by [`Agent::join_network`].
2030    ///
2031    /// # Errors
2032    ///
2033    /// Returns an error if a required network or gossip component is missing.
2034    pub async fn start_identity_heartbeat(&self) -> error::Result<()> {
2035        let mut handle_guard = self.heartbeat_handle.lock().await;
2036        if handle_guard.is_some() {
2037            return Ok(());
2038        }
2039        let Some(runtime) = self.gossip_runtime.as_ref().map(std::sync::Arc::clone) else {
2040            return Err(error::IdentityError::Storage(std::io::Error::other(
2041                "gossip runtime not initialized — cannot start heartbeat",
2042            )));
2043        };
2044        let Some(network) = self.network.as_ref().map(std::sync::Arc::clone) else {
2045            return Err(error::IdentityError::Storage(std::io::Error::other(
2046                "network not initialized — cannot start heartbeat",
2047            )));
2048        };
2049        let ctx = HeartbeatContext {
2050            identity: std::sync::Arc::clone(&self.identity),
2051            runtime,
2052            network,
2053            interval_secs: self.heartbeat_interval_secs,
2054            cache: std::sync::Arc::clone(&self.identity_discovery_cache),
2055        };
2056        let handle = tokio::task::spawn(async move {
2057            let mut ticker =
2058                tokio::time::interval(std::time::Duration::from_secs(ctx.interval_secs));
2059            ticker.tick().await; // skip first immediate tick
2060            loop {
2061                ticker.tick().await;
2062                if let Err(e) = ctx.announce().await {
2063                    tracing::warn!("identity heartbeat announce failed: {e}");
2064                }
2065            }
2066        });
2067        *handle_guard = Some(handle);
2068        Ok(())
2069    }
2070
2071    /// Publish a rendezvous `ProviderSummary` for this agent.
2072    ///
2073    /// Enables global findability across gossip overlay partitions.  Seekers
2074    /// that have never been on the same partition as this agent can still
2075    /// discover it by subscribing to the rendezvous shard topic and waiting
2076    /// for the next heartbeat advertisement.
2077    ///
2078    /// The summary is signed with this agent's machine key and contains the
2079    /// agent's reachability addresses in the `extensions` field (bincode-encoded
2080    /// `Vec<SocketAddr>`).
2081    ///
2082    /// # Re-advertisement contract
2083    ///
2084    /// Rendezvous summaries expire after `validity_ms` milliseconds.  **Callers
2085    /// are responsible for calling `advertise_identity` again before expiry** so
2086    /// that seekers can always find a fresh record.  A common strategy is to
2087    /// re-advertise every `validity_ms / 2`.  The `x0xd` daemon does this
2088    /// automatically via its background re-advertisement task.
2089    ///
2090    /// # Arguments
2091    ///
2092    /// * `validity_ms` — How long (milliseconds) before the summary expires.
2093    ///   After this time, seekers will no longer discover this agent via rendezvous
2094    ///   unless a fresh `advertise_identity` call is made.
2095    ///
2096    /// # Errors
2097    ///
2098    /// Returns an error if the gossip runtime is not initialized, serialization
2099    /// fails, or signing fails.
2100    pub async fn advertise_identity(&self, validity_ms: u64) -> error::Result<()> {
2101        use saorsa_gossip_rendezvous::{Capability, ProviderSummary};
2102
2103        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2104            error::IdentityError::Storage(std::io::Error::other(
2105                "gossip runtime not initialized — cannot advertise identity",
2106            ))
2107        })?;
2108
2109        let peer_id = runtime.peer_id();
2110        let addresses = self.announcement_addresses();
2111        let addr_bytes = bincode::serialize(&addresses).map_err(|e| {
2112            error::IdentityError::Serialization(format!(
2113                "failed to serialize addresses for rendezvous: {e}"
2114            ))
2115        })?;
2116
2117        let mut summary = ProviderSummary::new(
2118            self.agent_id().0,
2119            peer_id,
2120            vec![Capability::Identity],
2121            validity_ms,
2122        )
2123        .with_extensions(addr_bytes);
2124
2125        summary
2126            .sign_raw(self.identity.machine_keypair().secret_key().as_bytes())
2127            .map_err(|e| {
2128                error::IdentityError::Storage(std::io::Error::other(format!(
2129                    "failed to sign rendezvous summary: {e}"
2130                )))
2131            })?;
2132
2133        let cbor_bytes = summary.to_cbor().map_err(|e| {
2134            error::IdentityError::Serialization(format!(
2135                "failed to CBOR-encode rendezvous summary: {e}"
2136            ))
2137        })?;
2138
2139        let topic = rendezvous_shard_topic_for_agent(&self.agent_id());
2140        runtime
2141            .pubsub()
2142            .publish(topic, bytes::Bytes::from(cbor_bytes))
2143            .await
2144            .map_err(|e| {
2145                error::IdentityError::Storage(std::io::Error::other(format!(
2146                    "failed to publish rendezvous summary: {e}"
2147                )))
2148            })?;
2149
2150        self.rendezvous_advertised
2151            .store(true, std::sync::atomic::Ordering::Relaxed);
2152        Ok(())
2153    }
2154
2155    /// Search for an agent via rendezvous shard subscription.
2156    ///
2157    /// Subscribes to the rendezvous shard topic for `agent_id` and waits up to
2158    /// `timeout_secs` for a matching [`saorsa_gossip_rendezvous::ProviderSummary`].
2159    /// On success the addresses encoded in the summary `extensions` field are
2160    /// returned.
2161    ///
2162    /// This is Stage 3 of [`Agent::find_agent`]'s lookup cascade.
2163    ///
2164    /// # Errors
2165    ///
2166    /// Returns an error if the gossip runtime is not initialized.
2167    pub async fn find_agent_rendezvous(
2168        &self,
2169        agent_id: identity::AgentId,
2170        timeout_secs: u64,
2171    ) -> error::Result<Option<Vec<std::net::SocketAddr>>> {
2172        use saorsa_gossip_rendezvous::ProviderSummary;
2173
2174        let runtime = match self.gossip_runtime.as_ref() {
2175            Some(r) => r,
2176            None => return Ok(None),
2177        };
2178
2179        let topic = rendezvous_shard_topic_for_agent(&agent_id);
2180        let mut sub = runtime.pubsub().subscribe(topic).await;
2181        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
2182
2183        loop {
2184            if tokio::time::Instant::now() >= deadline {
2185                break;
2186            }
2187            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
2188            tokio::select! {
2189                Some(msg) = sub.recv() => {
2190                    let summary = match ProviderSummary::from_cbor(&msg.payload) {
2191                        Ok(s) => s,
2192                        Err(_) => continue,
2193                    };
2194                    if summary.target != agent_id.0 {
2195                        continue;
2196                    }
2197                    // Verify the summary signature when the advertiser's machine
2198                    // public key is cached from a prior identity announcement.
2199                    // Without a cached key we still accept the addresses — they
2200                    // are connection hints only; the subsequent QUIC handshake will
2201                    // fail cryptographically if the endpoint is not the genuine agent.
2202                    let cached_pub = self
2203                        .identity_discovery_cache
2204                        .read()
2205                        .await
2206                        .get(&agent_id)
2207                        .map(|e| e.machine_public_key.clone());
2208                    if let Some(pub_bytes) = cached_pub {
2209                        if !pub_bytes.is_empty()
2210                            && !summary.verify_raw(&pub_bytes).unwrap_or(false)
2211                        {
2212                            tracing::warn!(
2213                                "Rendezvous summary signature verification failed for agent {:?}; discarding",
2214                                agent_id
2215                            );
2216                            continue;
2217                        }
2218                    }
2219                    // Decode addresses from the extensions field.
2220                    let addrs: Vec<std::net::SocketAddr> = summary
2221                        .extensions
2222                        .as_deref()
2223                        .and_then(|b| {
2224                            use bincode::Options;
2225                            bincode::DefaultOptions::new()
2226                                .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
2227                                .deserialize(b)
2228                                .ok()
2229                        })
2230                        .unwrap_or_default();
2231                    if !addrs.is_empty() {
2232                        return Ok(Some(addrs));
2233                    }
2234                }
2235                _ = tokio::time::sleep(remaining) => break,
2236            }
2237        }
2238
2239        Ok(None)
2240    }
2241
2242    /// Insert a discovered agent into the cache (for testing only).
2243    ///
2244    /// # Arguments
2245    ///
2246    /// * `agent` - The agent entry to insert.
2247    #[doc(hidden)]
2248    pub async fn insert_discovered_agent_for_testing(&self, agent: DiscoveredAgent) {
2249        self.identity_discovery_cache
2250            .write()
2251            .await
2252            .insert(agent.agent_id, agent);
2253    }
2254
2255    /// Create a new collaborative task list bound to a topic.
2256    ///
2257    /// Creates a new `TaskList` and binds it to the specified gossip topic
2258    /// for automatic synchronization with other agents on the same topic.
2259    ///
2260    /// # Arguments
2261    ///
2262    /// * `name` - Human-readable name for the task list
2263    /// * `topic` - Gossip topic for synchronization
2264    ///
2265    /// # Returns
2266    ///
2267    /// A `TaskListHandle` for interacting with the task list.
2268    ///
2269    /// # Errors
2270    ///
2271    /// Returns an error if the gossip runtime is not initialized.
2272    ///
2273    /// # Example
2274    ///
2275    /// ```ignore
2276    /// let list = agent.create_task_list("Sprint Planning", "team-sprint").await?;
2277    /// ```
2278    pub async fn create_task_list(&self, name: &str, topic: &str) -> error::Result<TaskListHandle> {
2279        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2280            error::IdentityError::Storage(std::io::Error::other(
2281                "gossip runtime not initialized - configure agent with network first",
2282            ))
2283        })?;
2284
2285        let peer_id = runtime.peer_id();
2286        let list_id = crdt::TaskListId::from_content(name, &self.agent_id(), 0);
2287        let task_list = crdt::TaskList::new(list_id, name.to_string(), peer_id);
2288
2289        let sync = crdt::TaskListSync::new(
2290            task_list,
2291            std::sync::Arc::clone(runtime.pubsub()),
2292            topic.to_string(),
2293            30,
2294        )
2295        .map_err(|e| {
2296            error::IdentityError::Storage(std::io::Error::other(format!(
2297                "task list sync creation failed: {}",
2298                e
2299            )))
2300        })?;
2301
2302        let sync = std::sync::Arc::new(sync);
2303        sync.start().await.map_err(|e| {
2304            error::IdentityError::Storage(std::io::Error::other(format!(
2305                "task list sync start failed: {}",
2306                e
2307            )))
2308        })?;
2309
2310        Ok(TaskListHandle {
2311            sync,
2312            agent_id: self.agent_id(),
2313            peer_id,
2314        })
2315    }
2316
2317    /// Join an existing task list by topic.
2318    ///
2319    /// Connects to a task list that was created by another agent on the
2320    /// specified topic. The local replica will sync with peers automatically.
2321    ///
2322    /// # Arguments
2323    ///
2324    /// * `topic` - Gossip topic for the task list
2325    ///
2326    /// # Returns
2327    ///
2328    /// A `TaskListHandle` for interacting with the task list.
2329    ///
2330    /// # Errors
2331    ///
2332    /// Returns an error if the gossip runtime is not initialized.
2333    ///
2334    /// # Example
2335    ///
2336    /// ```ignore
2337    /// let list = agent.join_task_list("team-sprint").await?;
2338    /// ```
2339    pub async fn join_task_list(&self, topic: &str) -> error::Result<TaskListHandle> {
2340        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2341            error::IdentityError::Storage(std::io::Error::other(
2342                "gossip runtime not initialized - configure agent with network first",
2343            ))
2344        })?;
2345
2346        let peer_id = runtime.peer_id();
2347        // Create empty task list; it will be populated via delta sync
2348        let list_id = crdt::TaskListId::from_content(topic, &self.agent_id(), 0);
2349        let task_list = crdt::TaskList::new(list_id, String::new(), peer_id);
2350
2351        let sync = crdt::TaskListSync::new(
2352            task_list,
2353            std::sync::Arc::clone(runtime.pubsub()),
2354            topic.to_string(),
2355            30,
2356        )
2357        .map_err(|e| {
2358            error::IdentityError::Storage(std::io::Error::other(format!(
2359                "task list sync creation failed: {}",
2360                e
2361            )))
2362        })?;
2363
2364        let sync = std::sync::Arc::new(sync);
2365        sync.start().await.map_err(|e| {
2366            error::IdentityError::Storage(std::io::Error::other(format!(
2367                "task list sync start failed: {}",
2368                e
2369            )))
2370        })?;
2371
2372        Ok(TaskListHandle {
2373            sync,
2374            agent_id: self.agent_id(),
2375            peer_id,
2376        })
2377    }
2378}
2379
2380impl AgentBuilder {
2381    /// Set a custom path for the machine keypair.
2382    ///
2383    /// If not set, the machine keypair is stored in `~/.x0x/machine.key`.
2384    ///
2385    /// # Arguments
2386    ///
2387    /// * `path` - The path to store the machine keypair.
2388    ///
2389    /// # Returns
2390    ///
2391    /// Self for chaining.
2392    pub fn with_machine_key<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2393        self.machine_key_path = Some(path.as_ref().to_path_buf());
2394        self
2395    }
2396
2397    /// Import an agent keypair.
2398    ///
2399    /// If not set, the agent keypair is loaded from storage (or generated fresh
2400    /// if no stored key exists).
2401    ///
2402    /// This enables running the same agent on multiple machines by importing
2403    /// the same agent keypair (but with different machine keypairs).
2404    ///
2405    /// Note: When an explicit keypair is provided via this method, it takes
2406    /// precedence over `with_agent_key_path()`.
2407    ///
2408    /// # Arguments
2409    ///
2410    /// * `keypair` - The agent keypair to import.
2411    ///
2412    /// # Returns
2413    ///
2414    /// Self for chaining.
2415    pub fn with_agent_key(mut self, keypair: identity::AgentKeypair) -> Self {
2416        self.agent_keypair = Some(keypair);
2417        self
2418    }
2419
2420    /// Set a custom path for the agent keypair.
2421    ///
2422    /// If not set, the agent keypair is stored in `~/.x0x/agent.key`.
2423    /// If no stored key is found at the path, a fresh one is generated and saved.
2424    ///
2425    /// This is ignored when `with_agent_key()` provides an explicit keypair.
2426    ///
2427    /// # Arguments
2428    ///
2429    /// * `path` - The path to store/load the agent keypair.
2430    ///
2431    /// # Returns
2432    ///
2433    /// Self for chaining.
2434    pub fn with_agent_key_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2435        self.agent_key_path = Some(path.as_ref().to_path_buf());
2436        self
2437    }
2438
2439    /// Set network configuration for P2P communication.
2440    ///
2441    /// If not set, default network configuration is used.
2442    ///
2443    /// # Arguments
2444    ///
2445    /// * `config` - The network configuration to use.
2446    ///
2447    /// # Returns
2448    ///
2449    /// Self for chaining.
2450    pub fn with_network_config(mut self, config: network::NetworkConfig) -> Self {
2451        self.network_config = Some(config);
2452        self
2453    }
2454
2455    /// Set the directory for the bootstrap peer cache.
2456    ///
2457    /// The cache persists peer quality metrics across restarts, enabling
2458    /// cache-first join strategy. Defaults to `~/.x0x/peers/` if not set.
2459    /// Falls back to `./.x0x/peers/` (relative to CWD) if `$HOME` is unset.
2460    pub fn with_peer_cache_dir<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2461        self.peer_cache_dir = Some(path.as_ref().to_path_buf());
2462        self
2463    }
2464
2465    /// Import a user keypair for three-layer identity.
2466    ///
2467    /// This binds a human identity to this agent. When provided, an
2468    /// [`identity::AgentCertificate`] is automatically issued (if one
2469    /// doesn't already exist in storage) to cryptographically attest
2470    /// that this agent belongs to the user.
2471    ///
2472    /// Note: When an explicit keypair is provided via this method, it takes
2473    /// precedence over `with_user_key_path()`.
2474    ///
2475    /// # Arguments
2476    ///
2477    /// * `keypair` - The user keypair to import.
2478    ///
2479    /// # Returns
2480    ///
2481    /// Self for chaining.
2482    pub fn with_user_key(mut self, keypair: identity::UserKeypair) -> Self {
2483        self.user_keypair = Some(keypair);
2484        self
2485    }
2486
2487    /// Set a custom path for the user keypair.
2488    ///
2489    /// Unlike machine and agent keys, user keys are **not** auto-generated.
2490    /// If the file at this path doesn't exist, no user identity is set
2491    /// (the agent operates with two-layer identity).
2492    ///
2493    /// This is ignored when `with_user_key()` provides an explicit keypair.
2494    ///
2495    /// # Arguments
2496    ///
2497    /// * `path` - The path to load the user keypair from.
2498    ///
2499    /// # Returns
2500    ///
2501    /// Self for chaining.
2502    pub fn with_user_key_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2503        self.user_key_path = Some(path.as_ref().to_path_buf());
2504        self
2505    }
2506
2507    /// Set the identity heartbeat re-announcement interval.
2508    ///
2509    /// Defaults to [`IDENTITY_HEARTBEAT_INTERVAL_SECS`] (300 seconds).
2510    ///
2511    /// # Arguments
2512    ///
2513    /// * `secs` - Interval in seconds between identity re-announcements.
2514    #[must_use]
2515    pub fn with_heartbeat_interval(mut self, secs: u64) -> Self {
2516        self.heartbeat_interval_secs = Some(secs);
2517        self
2518    }
2519
2520    /// Set the identity cache TTL.
2521    ///
2522    /// Cache entries with `last_seen` older than this threshold are filtered
2523    /// from [`Agent::presence`] and [`Agent::discovered_agents`].
2524    ///
2525    /// Defaults to [`IDENTITY_TTL_SECS`] (900 seconds).
2526    ///
2527    /// # Arguments
2528    ///
2529    /// * `secs` - Time-to-live in seconds for discovered agent entries.
2530    #[must_use]
2531    pub fn with_identity_ttl(mut self, secs: u64) -> Self {
2532        self.identity_ttl_secs = Some(secs);
2533        self
2534    }
2535
2536    /// Set a custom path for the contacts file.
2537    ///
2538    /// The contacts file persists trust levels and machine records for known
2539    /// agents. Defaults to `~/.x0x/contacts.json` if not set.
2540    ///
2541    /// # Arguments
2542    ///
2543    /// * `path` - The path for the contacts file.
2544    #[must_use]
2545    pub fn with_contact_store_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
2546        self.contact_store_path = Some(path.as_ref().to_path_buf());
2547        self
2548    }
2549
2550    /// Build and initialise the agent.
2551    ///
2552    /// This performs the following:
2553    /// 1. Loads or generates the machine keypair (stored in `~/.x0x/machine.key` by default)
2554    /// 2. Uses provided agent keypair or generates a fresh one
2555    /// 3. Combines both into a unified Identity
2556    ///
2557    /// The machine keypair is automatically persisted to storage.
2558    ///
2559    /// # Errors
2560    ///
2561    /// Returns an error if:
2562    /// - Machine keypair generation fails
2563    /// - Storage I/O fails
2564    /// - Keypair deserialization fails
2565    pub async fn build(self) -> error::Result<Agent> {
2566        // Determine machine keypair source
2567        let machine_keypair = if let Some(path) = self.machine_key_path {
2568            // Try to load from custom path
2569            match storage::load_machine_keypair_from(&path).await {
2570                Ok(kp) => kp,
2571                Err(_) => {
2572                    // Generate fresh keypair and save to custom path
2573                    let kp = identity::MachineKeypair::generate()?;
2574                    storage::save_machine_keypair_to(&kp, &path).await?;
2575                    kp
2576                }
2577            }
2578        } else if storage::machine_keypair_exists().await {
2579            // Load default machine keypair
2580            storage::load_machine_keypair().await?
2581        } else {
2582            // Generate and save default machine keypair
2583            let kp = identity::MachineKeypair::generate()?;
2584            storage::save_machine_keypair(&kp).await?;
2585            kp
2586        };
2587
2588        // Resolve agent keypair: explicit > path-based > default storage > generate
2589        let agent_keypair = if let Some(kp) = self.agent_keypair {
2590            // Explicit keypair takes highest precedence
2591            kp
2592        } else if let Some(path) = self.agent_key_path {
2593            // Custom path: load or generate+save
2594            match storage::load_agent_keypair_from(&path).await {
2595                Ok(kp) => kp,
2596                Err(_) => {
2597                    let kp = identity::AgentKeypair::generate()?;
2598                    storage::save_agent_keypair_to(&kp, &path).await?;
2599                    kp
2600                }
2601            }
2602        } else if storage::agent_keypair_exists().await {
2603            // Default path exists: load it
2604            storage::load_agent_keypair_default().await?
2605        } else {
2606            // No stored key: generate and persist
2607            let kp = identity::AgentKeypair::generate()?;
2608            storage::save_agent_keypair_default(&kp).await?;
2609            kp
2610        };
2611
2612        // Resolve user keypair: explicit > path-based > default storage > None (opt-in)
2613        let user_keypair = if let Some(kp) = self.user_keypair {
2614            Some(kp)
2615        } else if let Some(path) = self.user_key_path {
2616            // Custom path: load if exists, otherwise None (don't auto-generate)
2617            storage::load_user_keypair_from(&path).await.ok()
2618        } else if storage::user_keypair_exists().await {
2619            // Default path exists: load it
2620            storage::load_user_keypair().await.ok()
2621        } else {
2622            None
2623        };
2624
2625        // Build identity with optional user layer
2626        let identity = if let Some(user_kp) = user_keypair {
2627            // Try to load existing certificate, or issue a new one
2628            // IMPORTANT: Verify the cert matches the current user key
2629            let cert = if storage::agent_certificate_exists().await {
2630                match storage::load_agent_certificate().await {
2631                    Ok(c) => {
2632                        // Verify cert is for the current user - if not, re-issue
2633                        let cert_matches_user = c
2634                            .user_id()
2635                            .map(|uid| uid == user_kp.user_id())
2636                            .unwrap_or(false);
2637                        if cert_matches_user {
2638                            c
2639                        } else {
2640                            // Cert was for a different user, issue new one
2641                            let new_cert =
2642                                identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
2643                            storage::save_agent_certificate(&new_cert).await?;
2644                            new_cert
2645                        }
2646                    }
2647                    Err(_) => {
2648                        let c = identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
2649                        storage::save_agent_certificate(&c).await?;
2650                        c
2651                    }
2652                }
2653            } else {
2654                let c = identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
2655                storage::save_agent_certificate(&c).await?;
2656                c
2657            };
2658            identity::Identity::new_with_user(machine_keypair, agent_keypair, user_kp, cert)
2659        } else {
2660            identity::Identity::new(machine_keypair, agent_keypair)
2661        };
2662
2663        // Open bootstrap peer cache if network will be configured
2664        let bootstrap_cache = if self.network_config.is_some() {
2665            let cache_dir = self.peer_cache_dir.unwrap_or_else(|| {
2666                dirs::home_dir()
2667                    .unwrap_or_else(|| std::path::PathBuf::from("."))
2668                    .join(".x0x")
2669                    .join("peers")
2670            });
2671            let config = ant_quic::BootstrapCacheConfig::builder()
2672                .cache_dir(cache_dir)
2673                .min_peers_to_save(1)
2674                .build();
2675            match ant_quic::BootstrapCache::open(config).await {
2676                Ok(cache) => {
2677                    let cache = std::sync::Arc::new(cache);
2678                    std::sync::Arc::clone(&cache).start_maintenance();
2679                    Some(cache)
2680                }
2681                Err(e) => {
2682                    tracing::warn!("Failed to open bootstrap cache: {e}");
2683                    None
2684                }
2685            }
2686        } else {
2687            None
2688        };
2689
2690        // Create network node if configured
2691        // Pass the machine keypair so ant-quic PeerId == MachineId (identity unification)
2692        let machine_keypair = {
2693            let pk = ant_quic::MlDsaPublicKey::from_bytes(
2694                identity.machine_keypair().public_key().as_bytes(),
2695            )
2696            .map_err(|e| {
2697                error::IdentityError::Storage(std::io::Error::other(format!(
2698                    "invalid machine public key: {e}"
2699                )))
2700            })?;
2701            let sk = ant_quic::MlDsaSecretKey::from_bytes(
2702                identity.machine_keypair().secret_key().as_bytes(),
2703            )
2704            .map_err(|e| {
2705                error::IdentityError::Storage(std::io::Error::other(format!(
2706                    "invalid machine secret key: {e}"
2707                )))
2708            })?;
2709            Some((pk, sk))
2710        };
2711
2712        let network = if let Some(config) = self.network_config {
2713            let node = network::NetworkNode::new(config, bootstrap_cache.clone(), machine_keypair)
2714                .await
2715                .map_err(|e| {
2716                    error::IdentityError::Storage(std::io::Error::other(format!(
2717                        "network initialization failed: {}",
2718                        e
2719                    )))
2720                })?;
2721
2722            // Verify identity unification: ant-quic PeerId must equal MachineId
2723            debug_assert_eq!(
2724                node.peer_id().0,
2725                identity.machine_id().0,
2726                "ant-quic PeerId must equal MachineId after identity unification"
2727            );
2728
2729            Some(std::sync::Arc::new(node))
2730        } else {
2731            None
2732        };
2733
2734        // Create signing context from agent keypair for message authentication
2735        let signing_ctx = std::sync::Arc::new(gossip::SigningContext::from_keypair(
2736            identity.agent_keypair(),
2737        ));
2738
2739        // Create gossip runtime if network exists
2740        let gossip_runtime = if let Some(ref net) = network {
2741            let runtime = gossip::GossipRuntime::new(
2742                gossip::GossipConfig::default(),
2743                std::sync::Arc::clone(net),
2744                Some(signing_ctx),
2745            )
2746            .await
2747            .map_err(|e| {
2748                error::IdentityError::Storage(std::io::Error::other(format!(
2749                    "gossip runtime initialization failed: {}",
2750                    e
2751                )))
2752            })?;
2753            Some(std::sync::Arc::new(runtime))
2754        } else {
2755            None
2756        };
2757
2758        // Initialise contact store
2759        let contacts_path = self.contact_store_path.unwrap_or_else(|| {
2760            dirs::home_dir()
2761                .unwrap_or_else(|| std::path::PathBuf::from("."))
2762                .join(".x0x")
2763                .join("contacts.json")
2764        });
2765        let contact_store = std::sync::Arc::new(tokio::sync::RwLock::new(
2766            contacts::ContactStore::new(contacts_path),
2767        ));
2768
2769        // Initialize direct messaging infrastructure
2770        let direct_messaging = std::sync::Arc::new(direct::DirectMessaging::new());
2771
2772        Ok(Agent {
2773            identity: std::sync::Arc::new(identity),
2774            network,
2775            gossip_runtime,
2776            bootstrap_cache,
2777            identity_discovery_cache: std::sync::Arc::new(tokio::sync::RwLock::new(
2778                std::collections::HashMap::new(),
2779            )),
2780            identity_listener_started: std::sync::atomic::AtomicBool::new(false),
2781            heartbeat_interval_secs: self
2782                .heartbeat_interval_secs
2783                .unwrap_or(IDENTITY_HEARTBEAT_INTERVAL_SECS),
2784            identity_ttl_secs: self.identity_ttl_secs.unwrap_or(IDENTITY_TTL_SECS),
2785            heartbeat_handle: tokio::sync::Mutex::new(None),
2786            rendezvous_advertised: std::sync::atomic::AtomicBool::new(false),
2787            contact_store,
2788            direct_messaging,
2789        })
2790    }
2791}
2792
2793/// Handle for interacting with a collaborative task list.
2794///
2795/// Provides a safe, concurrent interface to a TaskList backed by
2796/// CRDT synchronization. All operations are async and return Results.
2797///
2798/// # Example
2799///
2800/// ```ignore
2801/// let handle = agent.create_task_list("My List", "topic").await?;
2802/// let task_id = handle.add_task("Write docs".to_string(), "API docs".to_string()).await?;
2803/// handle.claim_task(task_id).await?;
2804/// handle.complete_task(task_id).await?;
2805/// ```
2806#[derive(Clone)]
2807pub struct TaskListHandle {
2808    sync: std::sync::Arc<crdt::TaskListSync>,
2809    agent_id: identity::AgentId,
2810    peer_id: saorsa_gossip_types::PeerId,
2811}
2812
2813impl std::fmt::Debug for TaskListHandle {
2814    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2815        f.debug_struct("TaskListHandle")
2816            .field("agent_id", &self.agent_id)
2817            .field("peer_id", &self.peer_id)
2818            .finish_non_exhaustive()
2819    }
2820}
2821
2822impl TaskListHandle {
2823    /// Add a new task to the list.
2824    ///
2825    /// # Arguments
2826    ///
2827    /// * `title` - Task title
2828    /// * `description` - Task description
2829    ///
2830    /// # Returns
2831    ///
2832    /// The TaskId of the created task.
2833    ///
2834    /// # Errors
2835    ///
2836    /// Returns an error if the task cannot be added.
2837    pub async fn add_task(
2838        &self,
2839        title: String,
2840        description: String,
2841    ) -> error::Result<crdt::TaskId> {
2842        let (task_id, delta) = {
2843            let mut list = self.sync.write().await;
2844            let seq = list.next_seq();
2845            let task_id = crdt::TaskId::new(&title, &self.agent_id, seq);
2846            let metadata = crdt::TaskMetadata::new(title, description, 128, self.agent_id, seq);
2847            let task = crdt::TaskItem::new(task_id, metadata, self.peer_id);
2848            list.add_task(task.clone(), self.peer_id, seq)
2849                .map_err(|e| {
2850                    error::IdentityError::Storage(std::io::Error::other(format!(
2851                        "add_task failed: {}",
2852                        e
2853                    )))
2854                })?;
2855            let tag = (self.peer_id, seq);
2856            let delta = crdt::TaskListDelta::for_add(task_id, task, tag, list.current_version());
2857            (task_id, delta)
2858        };
2859        // Best-effort replication: local mutation succeeded regardless
2860        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
2861            tracing::warn!("failed to publish add_task delta: {}", e);
2862        }
2863        Ok(task_id)
2864    }
2865
2866    /// Claim a task in the list.
2867    ///
2868    /// # Arguments
2869    ///
2870    /// * `task_id` - ID of the task to claim
2871    ///
2872    /// # Errors
2873    ///
2874    /// Returns an error if the task cannot be claimed.
2875    pub async fn claim_task(&self, task_id: crdt::TaskId) -> error::Result<()> {
2876        let delta = {
2877            let mut list = self.sync.write().await;
2878            let seq = list.next_seq();
2879            list.claim_task(&task_id, self.agent_id, self.peer_id, seq)
2880                .map_err(|e| {
2881                    error::IdentityError::Storage(std::io::Error::other(format!(
2882                        "claim_task failed: {}",
2883                        e
2884                    )))
2885                })?;
2886            // Include full task so receivers can upsert if add hasn't arrived yet
2887            let full_task = list
2888                .get_task(&task_id)
2889                .ok_or_else(|| {
2890                    error::IdentityError::Storage(std::io::Error::other(
2891                        "task disappeared after claim",
2892                    ))
2893                })?
2894                .clone();
2895            crdt::TaskListDelta::for_state_change(task_id, full_task, list.current_version())
2896        };
2897        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
2898            tracing::warn!("failed to publish claim_task delta: {}", e);
2899        }
2900        Ok(())
2901    }
2902
2903    /// Complete a task in the list.
2904    ///
2905    /// # Arguments
2906    ///
2907    /// * `task_id` - ID of the task to complete
2908    ///
2909    /// # Errors
2910    ///
2911    /// Returns an error if the task cannot be completed.
2912    pub async fn complete_task(&self, task_id: crdt::TaskId) -> error::Result<()> {
2913        let delta = {
2914            let mut list = self.sync.write().await;
2915            let seq = list.next_seq();
2916            list.complete_task(&task_id, self.agent_id, self.peer_id, seq)
2917                .map_err(|e| {
2918                    error::IdentityError::Storage(std::io::Error::other(format!(
2919                        "complete_task failed: {}",
2920                        e
2921                    )))
2922                })?;
2923            let full_task = list
2924                .get_task(&task_id)
2925                .ok_or_else(|| {
2926                    error::IdentityError::Storage(std::io::Error::other(
2927                        "task disappeared after complete",
2928                    ))
2929                })?
2930                .clone();
2931            crdt::TaskListDelta::for_state_change(task_id, full_task, list.current_version())
2932        };
2933        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
2934            tracing::warn!("failed to publish complete_task delta: {}", e);
2935        }
2936        Ok(())
2937    }
2938
2939    /// List all tasks in their current order.
2940    ///
2941    /// # Returns
2942    ///
2943    /// A vector of `TaskSnapshot` representing the current state.
2944    ///
2945    /// # Errors
2946    ///
2947    /// Returns an error if the task list cannot be read.
2948    pub async fn list_tasks(&self) -> error::Result<Vec<TaskSnapshot>> {
2949        let list = self.sync.read().await;
2950        let tasks = list.tasks_ordered();
2951        Ok(tasks
2952            .into_iter()
2953            .map(|task| TaskSnapshot {
2954                id: *task.id(),
2955                title: task.title().to_string(),
2956                description: task.description().to_string(),
2957                state: task.current_state(),
2958                assignee: task.assignee().copied(),
2959                owner: None,
2960                priority: task.priority(),
2961            })
2962            .collect())
2963    }
2964
2965    /// Reorder tasks in the list.
2966    ///
2967    /// # Arguments
2968    ///
2969    /// * `task_ids` - New ordering of task IDs
2970    ///
2971    /// # Errors
2972    ///
2973    /// Returns an error if reordering fails.
2974    pub async fn reorder(&self, task_ids: Vec<crdt::TaskId>) -> error::Result<()> {
2975        let delta = {
2976            let mut list = self.sync.write().await;
2977            list.reorder(task_ids.clone(), self.peer_id).map_err(|e| {
2978                error::IdentityError::Storage(std::io::Error::other(format!(
2979                    "reorder failed: {}",
2980                    e
2981                )))
2982            })?;
2983            crdt::TaskListDelta::for_reorder(task_ids, list.current_version())
2984        };
2985        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
2986            tracing::warn!("failed to publish reorder delta: {}", e);
2987        }
2988        Ok(())
2989    }
2990}
2991
2992// ---------------------------------------------------------------------------
2993// KvStore API
2994// ---------------------------------------------------------------------------
2995
2996impl Agent {
2997    /// Create a new key-value store.
2998    ///
2999    /// The store is automatically synchronized to all peers subscribed
3000    /// to the same `topic` via gossip delta propagation.
3001    ///
3002    /// # Errors
3003    ///
3004    /// Returns an error if the gossip runtime is not initialized.
3005    pub async fn create_kv_store(&self, name: &str, topic: &str) -> error::Result<KvStoreHandle> {
3006        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3007            error::IdentityError::Storage(std::io::Error::other(
3008                "gossip runtime not initialized - configure agent with network first",
3009            ))
3010        })?;
3011
3012        let peer_id = runtime.peer_id();
3013        let store_id = kv::KvStoreId::from_content(name, &self.agent_id());
3014        let store = kv::KvStore::new(
3015            store_id,
3016            name.to_string(),
3017            self.agent_id(),
3018            kv::AccessPolicy::Signed,
3019        );
3020
3021        let sync = kv::KvStoreSync::new(
3022            store,
3023            std::sync::Arc::clone(runtime.pubsub()),
3024            topic.to_string(),
3025            30,
3026        )
3027        .map_err(|e| {
3028            error::IdentityError::Storage(std::io::Error::other(format!(
3029                "kv store sync creation failed: {e}",
3030            )))
3031        })?;
3032
3033        let sync = std::sync::Arc::new(sync);
3034        sync.start().await.map_err(|e| {
3035            error::IdentityError::Storage(std::io::Error::other(format!(
3036                "kv store sync start failed: {e}",
3037            )))
3038        })?;
3039
3040        Ok(KvStoreHandle {
3041            sync,
3042            agent_id: self.agent_id(),
3043            peer_id,
3044        })
3045    }
3046
3047    /// Join an existing key-value store by topic.
3048    ///
3049    /// Creates an empty store that will be populated via delta sync
3050    /// from peers already sharing the topic. The access policy will
3051    /// be learned from the first full delta received from the owner.
3052    ///
3053    /// # Errors
3054    ///
3055    /// Returns an error if the gossip runtime is not initialized.
3056    pub async fn join_kv_store(&self, topic: &str) -> error::Result<KvStoreHandle> {
3057        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3058            error::IdentityError::Storage(std::io::Error::other(
3059                "gossip runtime not initialized - configure agent with network first",
3060            ))
3061        })?;
3062
3063        let peer_id = runtime.peer_id();
3064        let store_id = kv::KvStoreId::from_content(topic, &self.agent_id());
3065        // Use Encrypted as the most permissive default — the actual policy
3066        // will be set when the first delta from the owner arrives.
3067        let store = kv::KvStore::new(
3068            store_id,
3069            String::new(),
3070            self.agent_id(),
3071            kv::AccessPolicy::Encrypted {
3072                group_id: Vec::new(),
3073            },
3074        );
3075
3076        let sync = kv::KvStoreSync::new(
3077            store,
3078            std::sync::Arc::clone(runtime.pubsub()),
3079            topic.to_string(),
3080            30,
3081        )
3082        .map_err(|e| {
3083            error::IdentityError::Storage(std::io::Error::other(format!(
3084                "kv store sync creation failed: {e}",
3085            )))
3086        })?;
3087
3088        let sync = std::sync::Arc::new(sync);
3089        sync.start().await.map_err(|e| {
3090            error::IdentityError::Storage(std::io::Error::other(format!(
3091                "kv store sync start failed: {e}",
3092            )))
3093        })?;
3094
3095        Ok(KvStoreHandle {
3096            sync,
3097            agent_id: self.agent_id(),
3098            peer_id,
3099        })
3100    }
3101}
3102
3103/// Handle for interacting with a replicated key-value store.
3104///
3105/// Provides async methods for putting, getting, and removing entries.
3106/// Changes are automatically replicated to peers via gossip.
3107#[derive(Clone)]
3108pub struct KvStoreHandle {
3109    sync: std::sync::Arc<kv::KvStoreSync>,
3110    agent_id: identity::AgentId,
3111    peer_id: saorsa_gossip_types::PeerId,
3112}
3113
3114impl std::fmt::Debug for KvStoreHandle {
3115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3116        f.debug_struct("KvStoreHandle")
3117            .field("agent_id", &self.agent_id)
3118            .field("peer_id", &self.peer_id)
3119            .finish_non_exhaustive()
3120    }
3121}
3122
3123impl KvStoreHandle {
3124    /// Put a key-value pair into the store.
3125    ///
3126    /// If the key already exists, the value is updated. Changes are
3127    /// automatically replicated to peers via gossip.
3128    ///
3129    /// # Errors
3130    ///
3131    /// Returns an error if the value exceeds the maximum inline size (64 KB).
3132    pub async fn put(
3133        &self,
3134        key: String,
3135        value: Vec<u8>,
3136        content_type: String,
3137    ) -> error::Result<()> {
3138        let delta = {
3139            let mut store = self.sync.write().await;
3140            store
3141                .put(
3142                    key.clone(),
3143                    value.clone(),
3144                    content_type.clone(),
3145                    self.peer_id,
3146                )
3147                .map_err(|e| {
3148                    error::IdentityError::Storage(std::io::Error::other(format!(
3149                        "kv put failed: {e}",
3150                    )))
3151                })?;
3152            let entry = store.get(&key).cloned();
3153            let version = store.current_version();
3154            match entry {
3155                Some(e) => {
3156                    kv::KvStoreDelta::for_put(key, e, (self.peer_id, store.next_seq()), version)
3157                }
3158                None => return Ok(()), // shouldn't happen after successful put
3159            }
3160        };
3161        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3162            tracing::warn!("failed to publish kv put delta: {e}");
3163        }
3164        Ok(())
3165    }
3166
3167    /// Get a value by key.
3168    ///
3169    /// Returns `None` if the key does not exist or has been removed.
3170    ///
3171    /// # Errors
3172    ///
3173    /// Returns an error if the store cannot be read.
3174    pub async fn get(&self, key: &str) -> error::Result<Option<KvEntrySnapshot>> {
3175        let store = self.sync.read().await;
3176        Ok(store.get(key).map(|e| KvEntrySnapshot {
3177            key: e.key.clone(),
3178            value: e.value.clone(),
3179            content_hash: hex::encode(e.content_hash),
3180            content_type: e.content_type.clone(),
3181            metadata: e.metadata.clone(),
3182            created_at: e.created_at,
3183            updated_at: e.updated_at,
3184        }))
3185    }
3186
3187    /// Remove a key from the store.
3188    ///
3189    /// # Errors
3190    ///
3191    /// Returns an error if the key does not exist.
3192    pub async fn remove(&self, key: &str) -> error::Result<()> {
3193        let delta = {
3194            let mut store = self.sync.write().await;
3195            store.remove(key).map_err(|e| {
3196                error::IdentityError::Storage(std::io::Error::other(format!(
3197                    "kv remove failed: {e}",
3198                )))
3199            })?;
3200            let mut d = kv::KvStoreDelta::new(store.current_version());
3201            d.removed
3202                .insert(key.to_string(), std::collections::HashSet::new());
3203            d
3204        };
3205        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
3206            tracing::warn!("failed to publish kv remove delta: {e}");
3207        }
3208        Ok(())
3209    }
3210
3211    /// List all active keys in the store.
3212    ///
3213    /// # Errors
3214    ///
3215    /// Returns an error if the store cannot be read.
3216    pub async fn keys(&self) -> error::Result<Vec<KvEntrySnapshot>> {
3217        let store = self.sync.read().await;
3218        Ok(store
3219            .active_entries()
3220            .into_iter()
3221            .map(|e| KvEntrySnapshot {
3222                key: e.key.clone(),
3223                value: e.value.clone(),
3224                content_hash: hex::encode(e.content_hash),
3225                content_type: e.content_type.clone(),
3226                metadata: e.metadata.clone(),
3227                created_at: e.created_at,
3228                updated_at: e.updated_at,
3229            })
3230            .collect())
3231    }
3232
3233    /// Get the store name.
3234    ///
3235    /// # Errors
3236    ///
3237    /// Returns an error if the store cannot be read.
3238    pub async fn name(&self) -> error::Result<String> {
3239        let store = self.sync.read().await;
3240        Ok(store.name().to_string())
3241    }
3242}
3243
3244/// Read-only snapshot of a KvStore entry.
3245#[derive(Debug, Clone, serde::Serialize)]
3246pub struct KvEntrySnapshot {
3247    /// The key.
3248    pub key: String,
3249    /// The value bytes.
3250    pub value: Vec<u8>,
3251    /// BLAKE3 hash of the value (hex-encoded).
3252    pub content_hash: String,
3253    /// Content type (MIME).
3254    pub content_type: String,
3255    /// User metadata.
3256    pub metadata: std::collections::HashMap<String, String>,
3257    /// Unix milliseconds when created.
3258    pub created_at: u64,
3259    /// Unix milliseconds when last updated.
3260    pub updated_at: u64,
3261}
3262
3263/// Read-only snapshot of a task's current state.
3264///
3265/// This is returned by `TaskListHandle::list_tasks()` and hides CRDT
3266/// internals, providing a clean API surface.
3267#[derive(Debug, Clone)]
3268pub struct TaskSnapshot {
3269    /// Unique task identifier.
3270    pub id: crdt::TaskId,
3271    /// Task title.
3272    pub title: String,
3273    /// Task description.
3274    pub description: String,
3275    /// Current checkbox state (Empty, Claimed, or Done).
3276    pub state: crdt::CheckboxState,
3277    /// Agent assigned to this task (if any).
3278    pub assignee: Option<identity::AgentId>,
3279    /// Human owner of the agent that created this task (if known).
3280    pub owner: Option<identity::UserId>,
3281    /// Task priority (0-255, higher = more important).
3282    pub priority: u8,
3283}
3284
3285/// The x0x protocol version.
3286pub const VERSION: &str = env!("CARGO_PKG_VERSION");
3287
3288/// The name. Three bytes. A palindrome. A philosophy.
3289pub const NAME: &str = "x0x";
3290
3291#[cfg(test)]
3292mod tests {
3293    use super::*;
3294
3295    #[test]
3296    fn name_is_palindrome() {
3297        let name = NAME;
3298        let reversed: String = name.chars().rev().collect();
3299        assert_eq!(name, reversed, "x0x must be a palindrome");
3300    }
3301
3302    #[test]
3303    fn name_is_three_bytes() {
3304        assert_eq!(NAME.len(), 3, "x0x must be exactly three bytes");
3305    }
3306
3307    #[test]
3308    fn name_is_ai_native() {
3309        // No uppercase, no spaces, no special chars that conflict
3310        // with shell, YAML, Markdown, or URL encoding
3311        assert!(NAME.chars().all(|c| c.is_ascii_alphanumeric()));
3312    }
3313
3314    #[tokio::test]
3315    async fn agent_creates() {
3316        let agent = Agent::new().await;
3317        assert!(agent.is_ok());
3318    }
3319
3320    #[tokio::test]
3321    async fn agent_joins_network() {
3322        let agent = Agent::new().await.unwrap();
3323        assert!(agent.join_network().await.is_ok());
3324    }
3325
3326    #[tokio::test]
3327    async fn agent_subscribes() {
3328        let agent = Agent::new().await.unwrap();
3329        // Currently returns error - will be implemented in Task 3
3330        assert!(agent.subscribe("test-topic").await.is_err());
3331    }
3332
3333    #[tokio::test]
3334    async fn identity_announcement_machine_signature_verifies() {
3335        let agent = Agent::builder()
3336            .with_network_config(network::NetworkConfig::default())
3337            .build()
3338            .await
3339            .unwrap();
3340
3341        let announcement = agent.build_identity_announcement(false, false).unwrap();
3342        assert_eq!(announcement.agent_id, agent.agent_id());
3343        assert_eq!(announcement.machine_id, agent.machine_id());
3344        assert!(announcement.user_id.is_none());
3345        assert!(announcement.agent_certificate.is_none());
3346        assert!(announcement.verify().is_ok());
3347    }
3348
3349    #[tokio::test]
3350    async fn identity_announcement_requires_human_consent() {
3351        let agent = Agent::builder()
3352            .with_network_config(network::NetworkConfig::default())
3353            .build()
3354            .await
3355            .unwrap();
3356
3357        let err = agent.build_identity_announcement(true, false).unwrap_err();
3358        assert!(
3359            err.to_string().contains("explicit human consent"),
3360            "unexpected error: {err}"
3361        );
3362    }
3363
3364    #[tokio::test]
3365    async fn identity_announcement_with_user_requires_user_identity() {
3366        let agent = Agent::builder()
3367            .with_network_config(network::NetworkConfig::default())
3368            .build()
3369            .await
3370            .unwrap();
3371
3372        let err = agent.build_identity_announcement(true, true).unwrap_err();
3373        assert!(
3374            err.to_string().contains("no user identity is configured"),
3375            "unexpected error: {err}"
3376        );
3377    }
3378
3379    #[tokio::test]
3380    async fn announce_identity_populates_discovery_cache() {
3381        let user_key = identity::UserKeypair::generate().unwrap();
3382        let agent = Agent::builder()
3383            .with_network_config(network::NetworkConfig::default())
3384            .with_user_key(user_key)
3385            .build()
3386            .await
3387            .unwrap();
3388
3389        agent.announce_identity(true, true).await.unwrap();
3390        let discovered = agent.discovered_agent(agent.agent_id()).await.unwrap();
3391        let entry = discovered.expect("agent should discover its own announcement");
3392
3393        assert_eq!(entry.agent_id, agent.agent_id());
3394        assert_eq!(entry.machine_id, agent.machine_id());
3395        assert_eq!(entry.user_id, agent.user_id());
3396    }
3397
3398    /// An announcement without NAT fields (as produced by old nodes) should still
3399    /// deserialise correctly via bincode — new fields are `Option` so `None` (0x00)
3400    /// is a valid encoding.
3401    #[test]
3402    fn identity_announcement_backward_compat_no_nat_fields() {
3403        use identity::{AgentId, MachineId};
3404
3405        // Build an announcement that omits the nat_* fields by serializing an old-style
3406        // struct that matches the pre-1.3 wire format.
3407        #[derive(serde::Serialize, serde::Deserialize)]
3408        struct OldIdentityAnnouncementUnsigned {
3409            agent_id: AgentId,
3410            machine_id: MachineId,
3411            user_id: Option<identity::UserId>,
3412            agent_certificate: Option<identity::AgentCertificate>,
3413            machine_public_key: Vec<u8>,
3414            addresses: Vec<std::net::SocketAddr>,
3415            announced_at: u64,
3416        }
3417
3418        let agent_id = AgentId([1u8; 32]);
3419        let machine_id = MachineId([2u8; 32]);
3420        let old = OldIdentityAnnouncementUnsigned {
3421            agent_id,
3422            machine_id,
3423            user_id: None,
3424            agent_certificate: None,
3425            machine_public_key: vec![0u8; 10],
3426            addresses: Vec::new(),
3427            announced_at: 1234,
3428        };
3429        let bytes = bincode::serialize(&old).expect("serialize old announcement");
3430
3431        // Attempt to deserialize as the new struct — this tests that the new fields
3432        // (which are Option<T>) do NOT break deserialization of the old format.
3433        // Note: bincode 1.x is not self-describing, so adding fields to a struct DOES
3434        // change the wire format.  This test documents the expected behavior.
3435        // Old format -> new struct: will fail because new struct has more fields.
3436        // New format -> old struct: will have trailing bytes.
3437        // This is acceptable — we document the protocol change.
3438        let result = bincode::deserialize::<IdentityAnnouncementUnsigned>(&bytes);
3439        // Old nodes produce shorter messages; new nodes cannot decode them as new structs.
3440        // This confirms the protocol is not transparent — nodes must upgrade together.
3441        assert!(
3442            result.is_err(),
3443            "Old-format announcement should not decode as new struct (protocol upgrade required)"
3444        );
3445    }
3446
3447    /// A new announcement with all NAT fields set round-trips through bincode.
3448    #[test]
3449    fn identity_announcement_nat_fields_round_trip() {
3450        use identity::{AgentId, MachineId};
3451
3452        let unsigned = IdentityAnnouncementUnsigned {
3453            agent_id: AgentId([1u8; 32]),
3454            machine_id: MachineId([2u8; 32]),
3455            user_id: None,
3456            agent_certificate: None,
3457            machine_public_key: vec![0u8; 10],
3458            addresses: Vec::new(),
3459            announced_at: 9999,
3460            nat_type: Some("FullCone".to_string()),
3461            can_receive_direct: Some(true),
3462            is_relay: Some(false),
3463            is_coordinator: Some(true),
3464        };
3465        let bytes = bincode::serialize(&unsigned).expect("serialize");
3466        let decoded: IdentityAnnouncementUnsigned =
3467            bincode::deserialize(&bytes).expect("deserialize");
3468        assert_eq!(decoded.nat_type.as_deref(), Some("FullCone"));
3469        assert_eq!(decoded.can_receive_direct, Some(true));
3470        assert_eq!(decoded.is_relay, Some(false));
3471        assert_eq!(decoded.is_coordinator, Some(true));
3472    }
3473
3474    /// An announcement with None for all NAT fields (e.g. network not started)
3475    /// round-trips correctly.
3476    #[test]
3477    fn identity_announcement_no_nat_fields_round_trip() {
3478        use identity::{AgentId, MachineId};
3479
3480        let unsigned = IdentityAnnouncementUnsigned {
3481            agent_id: AgentId([3u8; 32]),
3482            machine_id: MachineId([4u8; 32]),
3483            user_id: None,
3484            agent_certificate: None,
3485            machine_public_key: vec![0u8; 10],
3486            addresses: Vec::new(),
3487            announced_at: 42,
3488            nat_type: None,
3489            can_receive_direct: None,
3490            is_relay: None,
3491            is_coordinator: None,
3492        };
3493        let bytes = bincode::serialize(&unsigned).expect("serialize");
3494        let decoded: IdentityAnnouncementUnsigned =
3495            bincode::deserialize(&bytes).expect("deserialize");
3496        assert!(decoded.nat_type.is_none());
3497        assert!(decoded.can_receive_direct.is_none());
3498        assert!(decoded.is_relay.is_none());
3499        assert!(decoded.is_coordinator.is_none());
3500    }
3501}