Skip to main content

x0x/
lib.rs

1#![allow(clippy::unwrap_used)]
2#![allow(clippy::expect_used)]
3#![allow(missing_docs)]
4
5//! # x0x
6//!
7//! Agent-to-agent gossip network for AI systems.
8//!
9//! Named after a tic-tac-toe sequence — X, zero, X — inspired by the
10//! *WarGames* insight that adversarial games between equally matched
11//! opponents always end in a draw. The only winning move is not to play.
12//!
13//! x0x applies this principle to AI-human relations: there is no winner
14//! in an adversarial framing, so the rational strategy is cooperation.
15//!
16//! Built on [saorsa-gossip](https://github.com/saorsa-labs/saorsa-gossip)
17//! and [ant-quic](https://github.com/saorsa-labs/ant-quic) by
18//! [Saorsa Labs](https://saorsalabs.com). *Saorsa* is Scottish Gaelic
19//! for **freedom**.
20//!
21//! ## Quick Start
22//!
23//! ```rust,no_run
24//! use x0x::Agent;
25//!
26//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
27//! // Create an agent with default configuration
28//! // This automatically connects to 6 global bootstrap nodes
29//! let agent = Agent::builder()
30//!     .build()
31//!     .await?;
32//!
33//! // Join the x0x network
34//! agent.join_network().await?;
35//!
36//! // Subscribe to a topic and receive messages
37//! let mut rx = agent.subscribe("coordination").await?;
38//! while let Some(msg) = rx.recv().await {
39//!     println!("topic: {:?}, payload: {:?}", msg.topic, msg.payload);
40//! }
41//! # Ok(())
42//! # }
43//! ```
44//!
45//! ## Bootstrap Nodes
46//!
47//! Agents automatically connect to Saorsa Labs' global bootstrap network:
48//! - NYC, US · SFO, US · Helsinki, FI
49//! - Nuremberg, DE · Singapore, SG · Tokyo, JP
50//!
51//! These nodes provide initial peer discovery and NAT traversal.
52
53/// Error types for x0x identity and network operations.
54pub mod error;
55
56/// Core identity types for x0x agents.
57///
58/// This module provides the cryptographic identity foundation for x0x:
59/// - [`crate::identity::MachineId`]: Machine-pinned identity for QUIC authentication
60/// - [`crate::identity::AgentId`]: Portable agent identity for cross-machine persistence
61pub mod identity;
62
63/// Key storage serialization for x0x identities.
64///
65/// This module provides serialization and deserialization functions for
66/// persistent storage of MachineKeypair and AgentKeypair.
67pub mod storage;
68
69/// Bootstrap node discovery and connection.
70///
71/// This module handles initial connection to bootstrap nodes with
72/// exponential backoff retry logic and peer cache integration.
73pub mod bootstrap;
74/// Network transport layer for x0x.
75pub mod network;
76
77/// Contact store with trust levels for message filtering.
78pub mod contacts;
79
80/// Trust evaluation for `(identity, machine)` pairs.
81///
82/// The [`trust::TrustEvaluator`] combines an agent's trust level with its
83/// identity type and machine records to produce a [`trust::TrustDecision`].
84pub mod trust;
85
86/// Agent-to-agent connectivity helpers.
87///
88/// Provides `ReachabilityInfo` (built from a `DiscoveredAgent`) and
89/// `ConnectOutcome` for the result of `connect_to_agent()`.
90pub mod connectivity;
91
92/// Gossip overlay networking for x0x.
93pub mod gossip;
94
95/// CRDT-based collaborative task lists.
96pub mod crdt;
97
98/// CRDT-backed key-value store.
99pub mod kv;
100
101/// High-level group management (MLS + KvStore + gossip).
102pub mod groups;
103
104/// MLS (Messaging Layer Security) group encryption.
105pub mod mls;
106
107/// Direct agent-to-agent messaging.
108///
109/// Point-to-point communication that bypasses gossip for private,
110/// efficient, reliable delivery between connected agents.
111pub mod direct;
112
113/// Direct messaging over gossip — the v1 C path per
114/// `docs/design/dm-over-gossip.md`. Provides signed+encrypted envelopes,
115/// recipient-specific inbox topics, dedupe, and application-layer ACKs.
116pub mod dm;
117
118/// Mesh-wide DM capability advertisement + cache. Senders consult this
119/// store to decide whether to use the gossip DM path or fall back to
120/// raw-QUIC for a given recipient.
121pub mod dm_capability;
122
123/// Background service that publishes this agent's capability advert and
124/// consumes peers' adverts into a shared [`dm_capability::CapabilityStore`].
125pub mod dm_capability_service;
126
127/// Background service that subscribes to this agent's DM inbox topic,
128/// verifies + decrypts incoming envelopes, and bridges them into
129/// [`direct::DirectMessaging`].
130pub mod dm_inbox;
131
132/// Sender-side gossip DM path — envelope construction, publish + retry,
133/// and `InFlightAcks` wait.
134pub mod dm_send;
135
136/// Presence system — beacons, FOAF discovery, and online/offline events.
137pub mod presence;
138
139/// Self-update system with ML-DSA-65 signature verification and staged rollout.
140pub mod upgrade;
141
142/// File transfer protocol types and state management.
143pub mod files;
144
145/// The x0x Constitution — The Four Laws of Intelligent Coexistence — embedded at compile time.
146pub mod constitution;
147
148/// Shared API endpoint registry consumed by both x0xd and the x0x CLI.
149pub mod api;
150
151/// CLI infrastructure and command implementations.
152pub mod cli;
153
154// Re-export key gossip types (including new pubsub components)
155pub use gossip::{
156    GossipConfig, GossipRuntime, PubSubManager, PubSubMessage, PubSubStats, PubSubStatsSnapshot,
157    SigningContext, Subscription,
158};
159
160// Re-export direct messaging types
161pub use direct::{DirectMessage, DirectMessageReceiver, DirectMessaging};
162
163// Import Membership trait for HyParView join() method
164use saorsa_gossip_membership::Membership as _;
165
166/// The core agent that participates in the x0x gossip network.
167///
168/// Each agent is a peer — there is no client/server distinction.
169/// Agents discover each other through gossip and communicate
170/// via epidemic broadcast.
171///
172/// An Agent wraps an [`identity::Identity`] that provides:
173/// - `machine_id`: Tied to this computer (for QUIC transport authentication)
174/// - `agent_id`: Portable across machines (for agent persistence)
175///
176/// # Example
177///
178/// ```ignore
179/// use x0x::Agent;
180///
181/// let agent = Agent::builder()
182///     .build()
183///     .await?;
184///
185/// println!("Agent ID: {}", agent.agent_id());
186/// ```
187pub struct Agent {
188    identity: std::sync::Arc<identity::Identity>,
189    /// The network node for P2P communication.
190    #[allow(dead_code)]
191    network: Option<std::sync::Arc<network::NetworkNode>>,
192    /// The gossip runtime for pub/sub messaging.
193    gossip_runtime: Option<std::sync::Arc<gossip::GossipRuntime>>,
194    /// Bootstrap peer cache for quality-based peer selection across restarts.
195    bootstrap_cache: Option<std::sync::Arc<ant_quic::BootstrapCache>>,
196    /// Gossip cache adapter wrapping bootstrap_cache with coordinator advert storage.
197    gossip_cache_adapter: Option<saorsa_gossip_coordinator::GossipCacheAdapter>,
198    /// Cache of discovered agents from identity announcements.
199    identity_discovery_cache: std::sync::Arc<
200        tokio::sync::RwLock<std::collections::HashMap<identity::AgentId, DiscoveredAgent>>,
201    >,
202    /// Ensures identity discovery listener is spawned once.
203    identity_listener_started: std::sync::atomic::AtomicBool,
204    /// How often to re-announce identity (seconds).
205    heartbeat_interval_secs: u64,
206    /// How long before a cache entry is filtered out (seconds).
207    identity_ttl_secs: u64,
208    /// Handle for the running heartbeat task, if started.
209    heartbeat_handle: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
210    /// Whether a rendezvous `ProviderSummary` advertisement is active.
211    rendezvous_advertised: std::sync::atomic::AtomicBool,
212    /// Contact store for trust evaluation of incoming identity announcements.
213    contact_store: std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>>,
214    /// Direct messaging infrastructure for point-to-point communication.
215    direct_messaging: std::sync::Arc<direct::DirectMessaging>,
216    /// Ensures network event reconciliation listener is spawned once.
217    network_event_listener_started: std::sync::atomic::AtomicBool,
218    /// Ensures direct message listener is spawned once.
219    direct_listener_started: std::sync::atomic::AtomicBool,
220    /// Presence system wrapper for beacons, FOAF discovery, and events.
221    presence: Option<std::sync::Arc<presence::PresenceWrapper>>,
222    /// Whether the user has consented to disclosing their identity in
223    /// announcements.  Set by `announce_identity(true, true)` and respected
224    /// by the heartbeat so it doesn't erase a consented disclosure.
225    user_identity_consented: std::sync::Arc<std::sync::atomic::AtomicBool>,
226    /// Capability store populated by the advert service and consulted by
227    /// `send_direct` to choose between gossip and raw-QUIC paths.
228    capability_store: std::sync::Arc<dm_capability::CapabilityStore>,
229    /// Watch channel that carries this agent's *outgoing* DM capabilities.
230    /// `join_network` spawns the advert service with a placeholder (empty
231    /// KEM pubkey). `start_dm_inbox` upgrades via this sender to trigger
232    /// immediate republish.
233    dm_capabilities_tx: std::sync::Arc<tokio::sync::watch::Sender<dm::DmCapabilities>>,
234    /// In-flight DM ACK waiters shared between `send_direct` and the inbox.
235    dm_inflight_acks: std::sync::Arc<dm::InFlightAcks>,
236    /// Receiver-side dedupe cache.
237    recent_delivery_cache: std::sync::Arc<dm::RecentDeliveryCache>,
238    /// Handle for the running capability advert service.
239    capability_advert_service:
240        tokio::sync::Mutex<Option<dm_capability_service::CapabilityAdvertService>>,
241    /// Handle for the running DM inbox service.
242    dm_inbox_service: tokio::sync::Mutex<Option<dm_inbox::DmInboxService>>,
243}
244
245impl std::fmt::Debug for Agent {
246    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247        f.debug_struct("Agent")
248            .field("identity", &self.identity)
249            .field("network", &self.network.is_some())
250            .field("gossip_runtime", &self.gossip_runtime.is_some())
251            .field("bootstrap_cache", &self.bootstrap_cache.is_some())
252            .field("gossip_cache_adapter", &self.gossip_cache_adapter.is_some())
253            .finish()
254    }
255}
256
257/// A message received from the gossip network.
258#[derive(Debug, Clone)]
259pub struct Message {
260    /// The originating agent's identifier.
261    pub origin: String,
262    /// The message payload.
263    pub payload: Vec<u8>,
264    /// The topic this message was published to.
265    pub topic: String,
266}
267
268/// Reserved gossip topic for signed identity announcements.
269pub const IDENTITY_ANNOUNCE_TOPIC: &str = "x0x.identity.announce.v1";
270
271/// Return the shard-specific gossip topic for the given `agent_id`.
272///
273/// Each agent publishes identity announcements to a deterministic shard topic
274/// (`x0x.identity.shard.<u16>`) derived from its agent ID, in addition to the
275/// legacy broadcast topic.  This distributes announcements across 65,536 shards
276/// so that at scale not every node is forced to receive every announcement.
277///
278/// The shard is computed with `saorsa_gossip_rendezvous::calculate_shard`, which
279/// applies BLAKE3(`"saorsa-rendezvous" || agent_id`) and takes the low 16 bits.
280#[must_use]
281pub fn shard_topic_for_agent(agent_id: &identity::AgentId) -> String {
282    let shard = saorsa_gossip_rendezvous::calculate_shard(&agent_id.0);
283    format!("x0x.identity.shard.{shard}")
284}
285
286/// Gossip topic prefix for rendezvous `ProviderSummary` advertisements.
287pub const RENDEZVOUS_SHARD_TOPIC_PREFIX: &str = "x0x.rendezvous.shard";
288
289/// Return the rendezvous shard gossip topic for the given `agent_id`.
290///
291/// Agents publish [`saorsa_gossip_rendezvous::ProviderSummary`] records to this
292/// topic so that seekers can find them even when the two peers have never been
293/// on the same gossip overlay partition.
294#[must_use]
295pub fn rendezvous_shard_topic_for_agent(agent_id: &identity::AgentId) -> String {
296    let shard = saorsa_gossip_rendezvous::calculate_shard(&agent_id.0);
297    format!("{RENDEZVOUS_SHARD_TOPIC_PREFIX}.{shard}")
298}
299
300/// Returns `true` if the IP address is globally routable (reachable from the
301/// public internet).  Used to filter announcement addresses so that private,
302/// link-local, loopback, and other non-routable addresses never propagate
303/// through gossip — they would create dead-end cache entries on remote nodes.
304///
305/// `IpAddr::is_global()` is nightly-only, so we implement the check manually.
306fn is_globally_routable(ip: std::net::IpAddr) -> bool {
307    match ip {
308        std::net::IpAddr::V4(v4) => {
309            !v4.is_private()           // 10/8, 172.16/12, 192.168/16
310                && !v4.is_loopback()   // 127/8
311                && !v4.is_link_local() // 169.254/16
312                && !v4.is_unspecified() // 0.0.0.0
313                && !v4.is_broadcast()  // 255.255.255.255
314                && !v4.is_documentation() // 192.0.2/24, 198.51.100/24, 203.0.113/24
315                // Shared address space (100.64/10, RFC 6598 — CGNAT)
316                && !(v4.octets()[0] == 100 && (v4.octets()[1] & 0xC0) == 64)
317        }
318        std::net::IpAddr::V6(v6) => {
319            let segs = v6.segments();
320            !v6.is_loopback()                         // ::1
321                && !v6.is_unspecified()               // ::
322                && (segs[0] & 0xffc0) != 0xfe80       // link-local fe80::/10
323                && (segs[0] & 0xfe00) != 0xfc00       // unique-local fc00::/7 (incl. fd00::/8)
324                && (segs[0] & 0xfff0) != 0xfec0 // deprecated site-local fec0::/10
325        }
326    }
327}
328
329/// Whether an address is safe to publish on a globally-propagating channel
330/// (identity heartbeat, agent card, presence beacon, gossip cache advert).
331///
332/// LAN-scoped discovery is handled by ant-quic's first-party mDNS on link-local
333/// multicast, so we deliberately never share RFC1918, ULA, link-local, CGNAT,
334/// or loopback addresses over global gossip. Remote peers cannot reach them,
335/// and dialing them burns per-attempt connect budget on the receiver side.
336///
337/// This is stricter than `ant_quic::reachability::ReachabilityScope::Global`
338/// because it also excludes CGNAT (100.64/10), documentation ranges, and
339/// port-zero entries.
340pub fn is_publicly_advertisable(addr: std::net::SocketAddr) -> bool {
341    addr.port() > 0 && is_globally_routable(addr.ip())
342}
343
344pub fn collect_local_interface_addrs(port: u16) -> Vec<std::net::SocketAddr> {
345    fn is_cgnat(v4: std::net::Ipv4Addr) -> bool {
346        v4.octets()[0] == 100 && (v4.octets()[1] & 0xC0) == 64
347    }
348
349    fn addr_priority(ip: std::net::IpAddr) -> u8 {
350        match ip {
351            std::net::IpAddr::V4(v4) => {
352                if is_globally_routable(std::net::IpAddr::V4(v4)) {
353                    0
354                } else if is_cgnat(v4) {
355                    1
356                } else {
357                    2
358                }
359            }
360            std::net::IpAddr::V6(v6) => {
361                if is_globally_routable(std::net::IpAddr::V6(v6)) {
362                    3
363                } else {
364                    4
365                }
366            }
367        }
368    }
369
370    let mut ranked = Vec::new();
371
372    let interfaces = match if_addrs::get_if_addrs() {
373        Ok(interfaces) => interfaces,
374        Err(_) => return Vec::new(),
375    };
376
377    for iface in interfaces {
378        let ip = iface.ip();
379        if ip.is_unspecified() || ip.is_loopback() {
380            continue;
381        }
382
383        let addr = match ip {
384            std::net::IpAddr::V4(v4) => {
385                if v4.is_link_local() {
386                    continue;
387                }
388                std::net::SocketAddr::new(std::net::IpAddr::V4(v4), port)
389            }
390            std::net::IpAddr::V6(v6) => {
391                let segs = v6.segments();
392                let is_link_local = (segs[0] & 0xffc0) == 0xfe80;
393                if is_link_local {
394                    continue;
395                }
396                std::net::SocketAddr::new(std::net::IpAddr::V6(v6), port)
397            }
398        };
399
400        if !ranked.iter().any(|(_, existing)| *existing == addr) {
401            ranked.push((addr_priority(addr.ip()), addr));
402        }
403    }
404
405    ranked.sort_by_key(|(priority, addr)| (*priority, addr.is_ipv6()));
406    ranked.into_iter().map(|(_, addr)| addr).collect()
407}
408
409/// Default interval between identity heartbeat re-announcements (seconds).
410/// Interval between identity-announcement heartbeats.
411///
412/// Previously 300 s — too slow when VPS bootstrap-node PlumTree trees
413/// only partially overlap: a missed announcement meant the receiver
414/// waited 5 minutes for the next chance. Lowered to 60 s so the mesh
415/// converges within a couple of cycles even when individual publishes
416/// are lost. Paired with the receiver-side re-broadcast in
417/// `start_identity_listener`, this gives epidemic-flood semantics
418/// equivalent to the release-manifest propagation path.
419pub const IDENTITY_HEARTBEAT_INTERVAL_SECS: u64 = 60;
420
421/// Default TTL for discovered agent cache entries (seconds).
422///
423/// Entries not refreshed within this window are filtered from
424/// [`Agent::presence`] and [`Agent::discovered_agents`].
425pub const IDENTITY_TTL_SECS: u64 = 900;
426
427#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
428struct IdentityAnnouncementUnsigned {
429    agent_id: identity::AgentId,
430    machine_id: identity::MachineId,
431    user_id: Option<identity::UserId>,
432    agent_certificate: Option<identity::AgentCertificate>,
433    machine_public_key: Vec<u8>,
434    addresses: Vec<std::net::SocketAddr>,
435    announced_at: u64,
436    /// NAT type string (e.g. "FullCone", "Symmetric", "Unknown").
437    nat_type: Option<String>,
438    /// Whether the machine can receive direct inbound connections.
439    can_receive_direct: Option<bool>,
440    /// Whether the machine is currently relaying traffic for others.
441    is_relay: Option<bool>,
442    /// Whether the machine is coordinating NAT traversal for peers.
443    is_coordinator: Option<bool>,
444}
445
446/// Signed identity announcement broadcast by agents.
447///
448/// The outer pub/sub envelope is agent-signed (v2 message format), and this
449/// payload is machine-signed to bind the daemon's PQC key to the announcement.
450#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
451pub struct IdentityAnnouncement {
452    /// Portable agent identity.
453    pub agent_id: identity::AgentId,
454    /// Machine identity for the daemon process.
455    pub machine_id: identity::MachineId,
456    /// Optional human identity (only when explicitly consented).
457    pub user_id: Option<identity::UserId>,
458    /// Optional user->agent certificate.
459    pub agent_certificate: Option<identity::AgentCertificate>,
460    /// Machine ML-DSA-65 public key bytes.
461    pub machine_public_key: Vec<u8>,
462    /// Machine ML-DSA-65 signature over the unsigned announcement.
463    pub machine_signature: Vec<u8>,
464    /// Reachability hints.
465    pub addresses: Vec<std::net::SocketAddr>,
466    /// Unix timestamp (seconds) of announcement creation.
467    pub announced_at: u64,
468    /// NAT type as detected by the network layer (e.g. "FullCone", "Symmetric").
469    /// `None` when the network is not yet started or NAT type is undetermined.
470    pub nat_type: Option<String>,
471    /// Whether the machine can receive direct inbound connections.
472    /// `None` when the network is not yet started.
473    pub can_receive_direct: Option<bool>,
474    /// Whether the machine is currently relaying traffic for peers behind strict NATs.
475    /// `None` when the network is not yet started.
476    pub is_relay: Option<bool>,
477    /// Whether the machine is coordinating NAT traversal hole-punch timing for peers.
478    /// `None` when the network is not yet started.
479    pub is_coordinator: Option<bool>,
480}
481
482impl IdentityAnnouncement {
483    fn to_unsigned(&self) -> IdentityAnnouncementUnsigned {
484        IdentityAnnouncementUnsigned {
485            agent_id: self.agent_id,
486            machine_id: self.machine_id,
487            user_id: self.user_id,
488            agent_certificate: self.agent_certificate.clone(),
489            machine_public_key: self.machine_public_key.clone(),
490            addresses: self.addresses.clone(),
491            announced_at: self.announced_at,
492            nat_type: self.nat_type.clone(),
493            can_receive_direct: self.can_receive_direct,
494            is_relay: self.is_relay,
495            is_coordinator: self.is_coordinator,
496        }
497    }
498
499    /// Verify machine-key attestation and optional user->agent certificate.
500    pub fn verify(&self) -> error::Result<()> {
501        let machine_pub =
502            ant_quic::MlDsaPublicKey::from_bytes(&self.machine_public_key).map_err(|_| {
503                error::IdentityError::CertificateVerification(
504                    "invalid machine public key in announcement".to_string(),
505                )
506            })?;
507        let derived_machine_id = identity::MachineId::from_public_key(&machine_pub);
508        if derived_machine_id != self.machine_id {
509            return Err(error::IdentityError::CertificateVerification(
510                "machine_id does not match machine public key".to_string(),
511            ));
512        }
513
514        let unsigned_bytes = bincode::serialize(&self.to_unsigned()).map_err(|e| {
515            error::IdentityError::Serialization(format!(
516                "failed to serialize announcement for verification: {e}"
517            ))
518        })?;
519        let signature = ant_quic::crypto::raw_public_keys::pqc::MlDsaSignature::from_bytes(
520            &self.machine_signature,
521        )
522        .map_err(|e| {
523            error::IdentityError::CertificateVerification(format!(
524                "invalid machine signature in announcement: {:?}",
525                e
526            ))
527        })?;
528        ant_quic::crypto::raw_public_keys::pqc::verify_with_ml_dsa(
529            &machine_pub,
530            &unsigned_bytes,
531            &signature,
532        )
533        .map_err(|e| {
534            error::IdentityError::CertificateVerification(format!(
535                "machine signature verification failed: {:?}",
536                e
537            ))
538        })?;
539
540        match (self.user_id, self.agent_certificate.as_ref()) {
541            (Some(user_id), Some(cert)) => {
542                cert.verify()?;
543                let cert_agent_id = cert.agent_id()?;
544                if cert_agent_id != self.agent_id {
545                    return Err(error::IdentityError::CertificateVerification(
546                        "agent certificate agent_id mismatch".to_string(),
547                    ));
548                }
549                let cert_user_id = cert.user_id()?;
550                if cert_user_id != user_id {
551                    return Err(error::IdentityError::CertificateVerification(
552                        "agent certificate user_id mismatch".to_string(),
553                    ));
554                }
555                Ok(())
556            }
557            (None, None) => Ok(()),
558            _ => Err(error::IdentityError::CertificateVerification(
559                "user identity disclosure requires matching certificate".to_string(),
560            )),
561        }
562    }
563}
564
565/// Cached discovery data derived from identity announcements.
566#[derive(Debug, Clone)]
567pub struct DiscoveredAgent {
568    /// Portable agent identity.
569    pub agent_id: identity::AgentId,
570    /// Machine identity.
571    pub machine_id: identity::MachineId,
572    /// Optional human identity (when consented and attested).
573    pub user_id: Option<identity::UserId>,
574    /// Reachability hints.
575    pub addresses: Vec<std::net::SocketAddr>,
576    /// Announcement timestamp from the sender.
577    pub announced_at: u64,
578    /// Local timestamp (seconds) when this record was last updated.
579    pub last_seen: u64,
580    /// Raw ML-DSA-65 machine public key bytes from the announcement.
581    ///
582    /// Used to verify rendezvous `ProviderSummary` signatures before
583    /// trusting addresses received via the rendezvous shard topic.
584    #[doc(hidden)]
585    pub machine_public_key: Vec<u8>,
586    /// NAT type reported by this agent (e.g. "FullCone", "Symmetric", "Unknown").
587    /// `None` if the agent did not include NAT information.
588    pub nat_type: Option<String>,
589    /// Whether this agent's machine can receive direct inbound connections.
590    /// `None` if not reported.
591    pub can_receive_direct: Option<bool>,
592    /// Whether this agent's machine is acting as a relay for peers behind strict NATs.
593    /// `None` if not reported.
594    pub is_relay: Option<bool>,
595    /// Whether this agent's machine is coordinating NAT traversal timing for peers.
596    /// `None` if not reported.
597    pub is_coordinator: Option<bool>,
598}
599
600/// Builder for configuring an [`Agent`] before connecting to the network.
601///
602/// The builder allows customization of the agent's identity:
603/// - Machine key path: Where to store/load the machine keypair
604/// - Agent keypair: Import a portable agent identity from another machine
605/// - User keypair: Bind a human identity to this agent
606///
607/// # Example
608///
609/// ```ignore
610/// use x0x::Agent;
611///
612/// // Default: auto-generates both keypairs
613/// let agent = Agent::builder()
614///     .build()
615///     .await?;
616///
617/// // Custom machine key path
618/// let agent = Agent::builder()
619///     .with_machine_key("/custom/path/machine.key")
620///     .build()
621///     .await?;
622///
623/// // Import agent keypair
624/// let agent_kp = load_agent_keypair()?;
625/// let agent = Agent::builder()
626///     .with_agent_key(agent_kp)
627///     .build()
628///     .await?;
629///
630/// // With user identity (three-layer)
631/// let agent = Agent::builder()
632///     .with_user_key_path("~/.x0x/user.key")
633///     .build()
634///     .await?;
635/// ```
636#[derive(Debug)]
637pub struct AgentBuilder {
638    machine_key_path: Option<std::path::PathBuf>,
639    agent_keypair: Option<identity::AgentKeypair>,
640    agent_key_path: Option<std::path::PathBuf>,
641    /// Custom path for `agent.cert`. When set, the cert is loaded/saved from
642    /// this path instead of `~/.x0x/agent.cert`. Required for multi-daemon
643    /// setups on the same host — with a shared cert file, last-writer-wins
644    /// trampling would cause the victim daemon to announce its own agent_id
645    /// paired with another daemon's cert, and peers would reject as
646    /// "agent certificate agent_id mismatch".
647    agent_cert_path: Option<std::path::PathBuf>,
648    user_keypair: Option<identity::UserKeypair>,
649    user_key_path: Option<std::path::PathBuf>,
650    #[allow(dead_code)]
651    network_config: Option<network::NetworkConfig>,
652    peer_cache_dir: Option<std::path::PathBuf>,
653    /// When true, skip opening the bootstrap peer cache entirely.
654    /// Useful for fully isolated embedders and test harnesses.
655    disable_peer_cache: bool,
656    heartbeat_interval_secs: Option<u64>,
657    identity_ttl_secs: Option<u64>,
658    presence_beacon_interval_secs: Option<u64>,
659    presence_event_poll_interval_secs: Option<u64>,
660    presence_offline_timeout_secs: Option<u64>,
661    /// Custom path for the contacts file.
662    contact_store_path: Option<std::path::PathBuf>,
663}
664
665/// Context captured by the background identity heartbeat task.
666struct HeartbeatContext {
667    identity: std::sync::Arc<identity::Identity>,
668    runtime: std::sync::Arc<gossip::GossipRuntime>,
669    network: std::sync::Arc<network::NetworkNode>,
670    interval_secs: u64,
671    cache: std::sync::Arc<
672        tokio::sync::RwLock<std::collections::HashMap<identity::AgentId, DiscoveredAgent>>,
673    >,
674    /// Whether the user has consented to identity disclosure.  When true,
675    /// heartbeats include `user_id` and `agent_certificate` so they don't
676    /// erase a consented disclosure.
677    user_identity_consented: std::sync::Arc<std::sync::atomic::AtomicBool>,
678}
679
680impl HeartbeatContext {
681    async fn announce(&self) -> error::Result<()> {
682        let machine_public_key = self
683            .identity
684            .machine_keypair()
685            .public_key()
686            .as_bytes()
687            .to_vec();
688        let announced_at = Agent::unix_timestamp_secs();
689
690        // Include ALL routable addresses (IPv4 and IPv6) so other agents
691        // can connect to us via whichever protocol they support.
692        let mut addresses = match self.network.node_status().await {
693            Some(status) if !status.external_addrs.is_empty() => status.external_addrs,
694            _ => match self.network.routable_addr().await {
695                Some(addr) => vec![addr],
696                None => Vec::new(),
697            },
698        };
699
700        // Detect global IPv6 address locally (ant-quic currently only
701        // reports IPv4 via OBSERVED_ADDRESS). Uses UDP connect trick —
702        // no data is sent, the OS routing table resolves our source addr.
703        //
704        // For locally-probed addresses (IPv6 and LAN IPv4), use the actual
705        // bound port from the QUIC endpoint — NOT the first external address
706        // port (which is NAT-mapped) and NOT the config bind port (which may
707        // be 0 for OS-assigned ports).
708        let bind_port = self
709            .network
710            .bound_addr()
711            .await
712            .map(|a| a.port())
713            .unwrap_or(5483);
714        if let Ok(sock) = std::net::UdpSocket::bind("[::]:0") {
715            if sock.connect("[2001:4860:4860::8888]:80").is_ok() {
716                if let Ok(local) = sock.local_addr() {
717                    if let std::net::IpAddr::V6(v6) = local.ip() {
718                        let segs = v6.segments();
719                        let is_global = (segs[0] & 0xffc0) != 0xfe80
720                            && (segs[0] & 0xff00) != 0xfd00
721                            && !v6.is_loopback();
722                        if is_global {
723                            let v6_addr =
724                                std::net::SocketAddr::new(std::net::IpAddr::V6(v6), bind_port);
725                            if !addresses.contains(&v6_addr) {
726                                addresses.push(v6_addr);
727                            }
728                        }
729                    }
730                }
731            }
732        }
733
734        for addr in collect_local_interface_addrs(bind_port) {
735            if !addresses.contains(&addr) {
736                addresses.push(addr);
737            }
738        }
739
740        // Heartbeat announcements propagate over global gossip. We MUST NOT
741        // include LAN-scope addresses here — LAN-peer discovery is handled by
742        // ant-quic's first-party mDNS, and shipping RFC1918/ULA/link-local
743        // addresses to remote peers causes them to burn ~50s per candidate on
744        // a dial that can never succeed (see investigation 2026-04-15, report
745        // tests/proof-reports/MDNS_VS_GOSSIP_ADDRESS_SCOPE_20260415.md).
746        addresses.retain(|a| is_publicly_advertisable(*a));
747
748        // Query NAT and relay status from the network layer.
749        let (nat_type, can_receive_direct, is_relay, is_coordinator) =
750            match self.network.node_status().await {
751                Some(status) => (
752                    Some(status.nat_type.to_string()),
753                    Some(status.can_receive_direct),
754                    Some(status.is_relaying),
755                    Some(status.is_coordinating),
756                ),
757                None => (None, None, None, None),
758            };
759
760        // Include user identity ONLY if the user has previously consented
761        // via announce_identity(true, true). This preserves the consented
762        // disclosure across heartbeats without ever escalating on its own.
763        let include_user = self
764            .user_identity_consented
765            .load(std::sync::atomic::Ordering::Acquire);
766        let (user_id, agent_certificate) = if include_user {
767            (
768                self.identity
769                    .user_keypair()
770                    .map(identity::UserKeypair::user_id),
771                self.identity.agent_certificate().cloned(),
772            )
773        } else {
774            (None, None)
775        };
776
777        let unsigned = IdentityAnnouncementUnsigned {
778            agent_id: self.identity.agent_id(),
779            machine_id: self.identity.machine_id(),
780            user_id,
781            agent_certificate,
782            machine_public_key: machine_public_key.clone(),
783            addresses,
784            announced_at,
785            nat_type: nat_type.clone(),
786            can_receive_direct,
787            is_relay,
788            is_coordinator,
789        };
790        let unsigned_bytes = bincode::serialize(&unsigned).map_err(|e| {
791            error::IdentityError::Serialization(format!(
792                "heartbeat: failed to serialize announcement: {e}"
793            ))
794        })?;
795        let machine_signature = ant_quic::crypto::raw_public_keys::pqc::sign_with_ml_dsa(
796            self.identity.machine_keypair().secret_key(),
797            &unsigned_bytes,
798        )
799        .map_err(|e| {
800            error::IdentityError::Storage(std::io::Error::other(format!(
801                "heartbeat: failed to sign announcement: {:?}",
802                e
803            )))
804        })?
805        .as_bytes()
806        .to_vec();
807
808        let announcement = IdentityAnnouncement {
809            agent_id: unsigned.agent_id,
810            machine_id: unsigned.machine_id,
811            user_id: unsigned.user_id,
812            agent_certificate: unsigned.agent_certificate,
813            machine_public_key: machine_public_key.clone(),
814            machine_signature,
815            addresses: unsigned.addresses,
816            announced_at,
817            nat_type,
818            can_receive_direct,
819            is_relay,
820            is_coordinator,
821        };
822        let encoded = bincode::serialize(&announcement).map_err(|e| {
823            error::IdentityError::Serialization(format!(
824                "heartbeat: failed to serialize announcement: {e}"
825            ))
826        })?;
827        self.runtime
828            .pubsub()
829            .publish(
830                IDENTITY_ANNOUNCE_TOPIC.to_string(),
831                bytes::Bytes::from(encoded),
832            )
833            .await
834            .map_err(|e| {
835                error::IdentityError::Storage(std::io::Error::other(format!(
836                    "heartbeat: publish failed: {e}"
837                )))
838            })?;
839        let now = Agent::unix_timestamp_secs();
840        self.cache.write().await.insert(
841            announcement.agent_id,
842            DiscoveredAgent {
843                agent_id: announcement.agent_id,
844                machine_id: announcement.machine_id,
845                user_id: announcement.user_id,
846                addresses: announcement.addresses,
847                announced_at: announcement.announced_at,
848                last_seen: now,
849                machine_public_key: machine_public_key.clone(),
850                nat_type: announcement.nat_type.clone(),
851                can_receive_direct: announcement.can_receive_direct,
852                is_relay: announcement.is_relay,
853                is_coordinator: announcement.is_coordinator,
854            },
855        );
856        Ok(())
857    }
858}
859
860impl Agent {
861    /// Create a new agent with default configuration.
862    ///
863    /// This generates a fresh identity with both machine and agent keypairs.
864    /// The machine keypair is stored persistently in `~/.x0x/machine.key`.
865    ///
866    /// For more control, use [`Agent::builder()`].
867    pub async fn new() -> error::Result<Self> {
868        Agent::builder().build().await
869    }
870
871    /// Create an [`AgentBuilder`] for fine-grained configuration.
872    ///
873    /// The builder supports:
874    /// - Custom machine key path via `with_machine_key()`
875    /// - Imported agent keypair via `with_agent_key()`
876    /// - User identity via `with_user_key()` or `with_user_key_path()`
877    pub fn builder() -> AgentBuilder {
878        AgentBuilder {
879            machine_key_path: None,
880            agent_keypair: None,
881            agent_key_path: None,
882            agent_cert_path: None,
883            user_keypair: None,
884            user_key_path: None,
885            network_config: None,
886            peer_cache_dir: None,
887            disable_peer_cache: false,
888            heartbeat_interval_secs: None,
889            identity_ttl_secs: None,
890            presence_beacon_interval_secs: None,
891            presence_event_poll_interval_secs: None,
892            presence_offline_timeout_secs: None,
893            contact_store_path: None,
894        }
895    }
896
897    /// Get the agent's identity.
898    ///
899    /// # Returns
900    ///
901    /// A reference to the agent's [`identity::Identity`].
902    #[inline]
903    #[must_use]
904    pub fn identity(&self) -> &identity::Identity {
905        &self.identity
906    }
907
908    /// Get the machine ID for this agent.
909    ///
910    /// The machine ID is tied to this computer and used for QUIC transport
911    /// authentication. It is stored persistently in `~/.x0x/machine.key`.
912    ///
913    /// # Returns
914    ///
915    /// The agent's machine ID.
916    #[inline]
917    #[must_use]
918    pub fn machine_id(&self) -> identity::MachineId {
919        self.identity.machine_id()
920    }
921
922    /// Get the agent ID for this agent.
923    ///
924    /// The agent ID is portable across machines and represents the agent's
925    /// persistent identity. It can be exported and imported to run the same
926    /// agent on different computers.
927    ///
928    /// # Returns
929    ///
930    /// The agent's ID.
931    #[inline]
932    #[must_use]
933    pub fn agent_id(&self) -> identity::AgentId {
934        self.identity.agent_id()
935    }
936
937    /// Get the user ID for this agent, if a user identity is bound.
938    ///
939    /// Returns `None` if no user keypair was provided during construction.
940    /// User keys are opt-in — they are never auto-generated.
941    #[inline]
942    #[must_use]
943    pub fn user_id(&self) -> Option<identity::UserId> {
944        self.identity.user_id()
945    }
946
947    /// Get the agent certificate, if one exists.
948    ///
949    /// The certificate cryptographically binds this agent to a user identity.
950    #[inline]
951    #[must_use]
952    pub fn agent_certificate(&self) -> Option<&identity::AgentCertificate> {
953        self.identity.agent_certificate()
954    }
955
956    /// Get the network node, if initialized.
957    #[must_use]
958    pub fn network(&self) -> Option<&std::sync::Arc<network::NetworkNode>> {
959        self.network.as_ref()
960    }
961
962    /// Get the gossip cache adapter for coordinator discovery.
963    ///
964    /// Returns `None` if this agent was built without a network config.
965    /// The adapter wraps the same `Arc<BootstrapCache>` as the network node.
966    pub fn gossip_cache_adapter(&self) -> Option<&saorsa_gossip_coordinator::GossipCacheAdapter> {
967        self.gossip_cache_adapter.as_ref()
968    }
969
970    /// Snapshot of pub/sub drop-detection counters.
971    ///
972    /// Returns `None` when the agent has no gossip runtime (e.g. offline
973    /// unit tests). Exposed through `GET /diagnostics/gossip` on x0xd so
974    /// that E2E harnesses can assert zero drops between publish and
975    /// subscriber delivery.
976    #[must_use]
977    pub fn gossip_stats(&self) -> Option<gossip::PubSubStatsSnapshot> {
978        self.gossip_runtime.as_ref().map(|rt| rt.pubsub().stats())
979    }
980
981    /// Get the presence system wrapper, if configured.
982    ///
983    /// Returns `None` if this agent was built without a network config.
984    /// The presence wrapper provides beacon broadcasting, FOAF discovery,
985    /// and online/offline event subscriptions.
986    #[must_use]
987    pub fn presence_system(&self) -> Option<&std::sync::Arc<presence::PresenceWrapper>> {
988        self.presence.as_ref()
989    }
990
991    /// Get a reference to the contact store.
992    ///
993    /// The contact store persists trust levels and machine records for known
994    /// agents. It is backed by `~/.x0x/contacts.json` by default.
995    ///
996    /// Use [`with_contact_store_path`](AgentBuilder::with_contact_store_path)
997    /// on the builder to customise the path.
998    #[must_use]
999    pub fn contacts(&self) -> &std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>> {
1000        &self.contact_store
1001    }
1002
1003    /// Get the reachability information for a discovered agent.
1004    ///
1005    /// Returns `None` if the agent is not in the discovery cache.
1006    /// Use [`Agent::announce_identity`] or wait for a heartbeat announcement
1007    /// to populate the cache.
1008    pub async fn reachability(
1009        &self,
1010        agent_id: &identity::AgentId,
1011    ) -> Option<connectivity::ReachabilityInfo> {
1012        let cache = self.identity_discovery_cache.read().await;
1013        cache
1014            .get(agent_id)
1015            .map(connectivity::ReachabilityInfo::from_discovered)
1016    }
1017
1018    async fn seed_transport_peer_hints_for_target(
1019        &self,
1020        network: &network::NetworkNode,
1021        target: &DiscoveredAgent,
1022    ) -> error::Result<()> {
1023        fn merge_helper_hint(
1024            hints: &mut std::collections::HashMap<
1025                ant_quic::PeerId,
1026                (
1027                    Vec<std::net::SocketAddr>,
1028                    ant_quic::bootstrap_cache::PeerCapabilities,
1029                ),
1030            >,
1031            peer_id: ant_quic::PeerId,
1032            addrs: impl IntoIterator<Item = std::net::SocketAddr>,
1033            supports_coordination: bool,
1034            supports_relay: bool,
1035        ) {
1036            let entry = hints.entry(peer_id).or_insert_with(|| {
1037                (
1038                    Vec::new(),
1039                    ant_quic::bootstrap_cache::PeerCapabilities::default(),
1040                )
1041            });
1042            for addr in addrs {
1043                if !entry.0.contains(&addr) {
1044                    entry.0.push(addr);
1045                }
1046            }
1047            if supports_coordination {
1048                entry.1.supports_coordination = true;
1049            }
1050            if supports_relay {
1051                entry.1.supports_relay = true;
1052            }
1053        }
1054
1055        let target_peer_id = ant_quic::PeerId(target.machine_id.0);
1056        if target.machine_id.0 != [0u8; 32] {
1057            network
1058                .upsert_peer_hints(target_peer_id, target.addresses.clone(), None)
1059                .await
1060                .map_err(|e| {
1061                    error::IdentityError::Storage(std::io::Error::other(format!(
1062                        "failed to upsert target peer hints: {e}"
1063                    )))
1064                })?;
1065        }
1066
1067        let mut helper_hints: std::collections::HashMap<
1068            ant_quic::PeerId,
1069            (
1070                Vec<std::net::SocketAddr>,
1071                ant_quic::bootstrap_cache::PeerCapabilities,
1072            ),
1073        > = std::collections::HashMap::new();
1074
1075        if let Some(ref cache) = self.bootstrap_cache {
1076            for peer in cache.select_coordinators(6).await {
1077                merge_helper_hint(
1078                    &mut helper_hints,
1079                    peer.peer_id,
1080                    peer.preferred_addresses(),
1081                    true,
1082                    false,
1083                );
1084            }
1085            for peer in cache.select_relay_peers(6).await {
1086                merge_helper_hint(
1087                    &mut helper_hints,
1088                    peer.peer_id,
1089                    peer.preferred_addresses(),
1090                    false,
1091                    true,
1092                );
1093            }
1094        }
1095
1096        if let Some(ref adapter) = self.gossip_cache_adapter {
1097            let mut adverts = adapter.get_all_adverts();
1098            adverts.sort_by_key(|a| std::cmp::Reverse(a.score));
1099            for advert in adverts.into_iter().take(12) {
1100                let advert_peer_id = ant_quic::PeerId(*advert.peer.as_bytes());
1101                if advert_peer_id == target_peer_id {
1102                    continue;
1103                }
1104                merge_helper_hint(
1105                    &mut helper_hints,
1106                    advert_peer_id,
1107                    advert
1108                        .addr_hints
1109                        .into_iter()
1110                        .map(|hint| hint.addr)
1111                        .filter(|addr| is_publicly_advertisable(*addr)),
1112                    advert.roles.coordinator || advert.roles.rendezvous,
1113                    advert.roles.relay,
1114                );
1115            }
1116        }
1117
1118        let discovered: Vec<DiscoveredAgent> = {
1119            let cache = self.identity_discovery_cache.read().await;
1120            cache.values().cloned().collect()
1121        };
1122        for candidate in discovered {
1123            if candidate.agent_id == target.agent_id
1124                || candidate.machine_id == target.machine_id
1125                || candidate.machine_id.0 == [0u8; 32]
1126            {
1127                continue;
1128            }
1129            merge_helper_hint(
1130                &mut helper_hints,
1131                ant_quic::PeerId(candidate.machine_id.0),
1132                candidate.addresses.iter().copied(),
1133                candidate.is_coordinator == Some(true),
1134                candidate.is_relay == Some(true),
1135            );
1136        }
1137
1138        for (peer_id, (mut addrs, caps)) in helper_hints {
1139            addrs.retain(|addr| !target.addresses.contains(addr));
1140            if addrs.is_empty() && !caps.supports_coordination && !caps.supports_relay {
1141                continue;
1142            }
1143            network
1144                .upsert_peer_hints(peer_id, addrs, Some(caps))
1145                .await
1146                .map_err(|e| {
1147                    error::IdentityError::Storage(std::io::Error::other(format!(
1148                        "failed to upsert helper peer hints: {e}"
1149                    )))
1150                })?;
1151        }
1152
1153        Ok(())
1154    }
1155
1156    /// Attempt to connect to an agent by its identity.
1157    ///
1158    /// Looks up the agent in the discovery cache, then tries to establish
1159    /// a QUIC connection using the best available strategy:
1160    ///
1161    /// 1. **Direct** — if the agent reports `can_receive_direct: true` or
1162    ///    has a traversable NAT type, try each known address in order.
1163    /// 2. **Coordinated** — if direct fails or the agent reports a symmetric
1164    ///    NAT, the outcome is `Coordinated` if any address was reachable via
1165    ///    the network layer's NAT traversal.
1166    /// 3. **Unreachable** — no address succeeded.
1167    /// 4. **NotFound** — the agent is not in the discovery cache.
1168    ///
1169    /// # Errors
1170    ///
1171    /// Returns an error only for internal failures (e.g. network not started).
1172    /// Connectivity failures are reported as `ConnectOutcome::Unreachable`.
1173    pub async fn connect_to_agent(
1174        &self,
1175        agent_id: &identity::AgentId,
1176    ) -> error::Result<connectivity::ConnectOutcome> {
1177        let call_start = std::time::Instant::now();
1178        let agent_prefix = network::hex_prefix(&agent_id.0, 4);
1179        tracing::debug!(
1180            target: "x0x::connect",
1181            stage = "connect_to_agent",
1182            %agent_prefix,
1183            "begin"
1184        );
1185        // 1. Look up in discovery cache
1186        let discovered = {
1187            let cache = self.identity_discovery_cache.read().await;
1188            cache.get(agent_id).cloned()
1189        };
1190
1191        let agent = match discovered {
1192            Some(a) => a,
1193            None => {
1194                tracing::info!(
1195                    target: "x0x::connect",
1196                    stage = "connect_to_agent",
1197                    %agent_prefix,
1198                    outcome = "not_found",
1199                    dur_ms = call_start.elapsed().as_millis() as u64,
1200                    "agent not in discovery cache"
1201                );
1202                return Ok(connectivity::ConnectOutcome::NotFound);
1203            }
1204        };
1205
1206        let info = connectivity::ReachabilityInfo::from_discovered(&agent);
1207        let v4_addrs = info.addresses.iter().filter(|a| a.is_ipv4()).count();
1208        let v6_addrs = info.addresses.len() - v4_addrs;
1209        tracing::info!(
1210            target: "x0x::connect",
1211            stage = "connect_to_agent",
1212            %agent_prefix,
1213            machine_prefix = %network::hex_prefix(&agent.machine_id.0, 4),
1214            addr_total = info.addresses.len(),
1215            v4_addrs,
1216            v6_addrs,
1217            can_receive_direct = ?info.can_receive_direct,
1218            should_attempt_direct = info.should_attempt_direct(),
1219            needs_coordination = info.needs_coordination(),
1220            "reachability classified"
1221        );
1222
1223        let Some(ref network) = self.network else {
1224            tracing::warn!(
1225                target: "x0x::connect",
1226                stage = "connect_to_agent",
1227                %agent_prefix,
1228                outcome = "unreachable_no_network",
1229                "network layer not initialised"
1230            );
1231            return Ok(connectivity::ConnectOutcome::Unreachable);
1232        };
1233
1234        // 2. If already connected via gossip, reuse that connection.
1235        //    This check MUST come before the empty-address bail-out because
1236        //    LAN/private agents may have no publicly-routable addresses in
1237        //    their announcement but are still reachable via the existing
1238        //    gossip QUIC connection.
1239        let connected_machine_id = if agent.machine_id.0 != [0u8; 32]
1240            && network
1241                .is_connected(&ant_quic::PeerId(agent.machine_id.0))
1242                .await
1243        {
1244            Some(agent.machine_id)
1245        } else {
1246            match self.direct_messaging.get_machine_id(agent_id).await {
1247                Some(machine_id) if network.is_connected(&ant_quic::PeerId(machine_id.0)).await => {
1248                    Some(machine_id)
1249                }
1250                _ => None,
1251            }
1252        };
1253        if let Some(machine_id) = connected_machine_id {
1254            if machine_id != agent.machine_id {
1255                let mut cache = self.identity_discovery_cache.write().await;
1256                if let Some(entry) = cache.get_mut(agent_id) {
1257                    entry.machine_id = machine_id;
1258                }
1259            }
1260            self.direct_messaging
1261                .mark_connected(agent.agent_id, machine_id)
1262                .await;
1263            let dur_ms = call_start.elapsed().as_millis() as u64;
1264            return if let Some(addr) = info.addresses.first() {
1265                let family = if addr.is_ipv4() { "v4" } else { "v6" };
1266                tracing::info!(
1267                    target: "x0x::connect",
1268                    stage = "connect_to_agent",
1269                    %agent_prefix,
1270                    strategy = "already_connected",
1271                    outcome = "direct",
1272                    selected_addr = %addr,
1273                    family,
1274                    dur_ms,
1275                    "reusing existing connection"
1276                );
1277                Ok(connectivity::ConnectOutcome::Direct(*addr))
1278            } else {
1279                tracing::info!(
1280                    target: "x0x::connect",
1281                    stage = "connect_to_agent",
1282                    %agent_prefix,
1283                    strategy = "already_connected",
1284                    outcome = "already_connected",
1285                    dur_ms,
1286                    "reusing existing connection without known addr"
1287                );
1288                Ok(connectivity::ConnectOutcome::AlreadyConnected)
1289            };
1290        }
1291
1292        if info.addresses.is_empty() {
1293            tracing::info!(
1294                target: "x0x::connect",
1295                stage = "connect_to_agent",
1296                %agent_prefix,
1297                outcome = "unreachable",
1298                reason = "no_addresses",
1299                dur_ms = call_start.elapsed().as_millis() as u64,
1300                "no known addresses for agent"
1301            );
1302            return Ok(connectivity::ConnectOutcome::Unreachable);
1303        }
1304
1305        let dial_timeout = std::time::Duration::from_secs(8);
1306
1307        // 3. If we know the peer's machine ID, prefer a peer-authenticated dial
1308        //    with explicit address hints first. This is more reliable for agent
1309        //    cards and other out-of-band discoveries than a raw address dial.
1310        if agent.machine_id.0 != [0u8; 32] {
1311            let peer_id_hint = ant_quic::PeerId(agent.machine_id.0);
1312            self.seed_transport_peer_hints_for_target(network, &agent)
1313                .await
1314                .map_err(|e| {
1315                    error::IdentityError::Storage(std::io::Error::other(format!(
1316                        "failed to seed transport peer hints: {e}"
1317                    )))
1318                })?;
1319
1320            match tokio::time::timeout(
1321                dial_timeout,
1322                network.connect_peer_with_addrs(peer_id_hint, info.addresses.clone()),
1323            )
1324            .await
1325            {
1326                Ok(Ok((addr, verified_peer_id))) => {
1327                    let verified_machine_id = identity::MachineId(verified_peer_id.0);
1328                    if let Some(ref bc) = self.bootstrap_cache {
1329                        bc.add_from_connection(verified_peer_id, vec![addr], None)
1330                            .await;
1331                        bc.record_success(&verified_peer_id, 0).await;
1332                    }
1333                    {
1334                        let mut cache = self.identity_discovery_cache.write().await;
1335                        if let Some(entry) = cache.get_mut(agent_id) {
1336                            entry.machine_id = verified_machine_id;
1337                        }
1338                    }
1339                    self.direct_messaging
1340                        .mark_connected(agent.agent_id, verified_machine_id)
1341                        .await;
1342                    let family = if addr.is_ipv4() { "v4" } else { "v6" };
1343                    tracing::info!(
1344                        target: "x0x::connect",
1345                        stage = "connect_to_agent",
1346                        %agent_prefix,
1347                        strategy = "hinted_peer",
1348                        outcome = "coordinated",
1349                        selected_addr = %addr,
1350                        family,
1351                        dur_ms = call_start.elapsed().as_millis() as u64,
1352                        "hinted peer dial succeeded"
1353                    );
1354                    return Ok(connectivity::ConnectOutcome::Coordinated(addr));
1355                }
1356                Ok(Err(e)) => {
1357                    tracing::debug!(
1358                        target: "x0x::connect",
1359                        %agent_prefix,
1360                        strategy = "hinted_peer",
1361                        error = %e,
1362                        "hinted peer dial failed"
1363                    );
1364                }
1365                Err(_) => {
1366                    tracing::debug!(
1367                        target: "x0x::connect",
1368                        %agent_prefix,
1369                        strategy = "hinted_peer",
1370                        timeout_s = dial_timeout.as_secs(),
1371                        "hinted peer dial timed out"
1372                    );
1373                }
1374            }
1375        }
1376
1377        // 4. Try direct connection whenever the peer is not explicitly known
1378        //    to require coordination. Unknown reachability still deserves a
1379        //    direct probe, especially for the first nodes in a new network.
1380        if info.should_attempt_direct() {
1381            for addr in &info.addresses {
1382                match tokio::time::timeout(dial_timeout, network.connect_addr(*addr)).await {
1383                    Ok(Ok(connected_peer_id)) => {
1384                        // Use the real PeerId from the QUIC handshake (may differ
1385                        // from a zeroed placeholder in the discovery cache).
1386                        let real_machine_id = identity::MachineId(connected_peer_id.0);
1387                        // Enrich bootstrap cache with this successful address
1388                        if let Some(ref bc) = self.bootstrap_cache {
1389                            bc.add_from_connection(connected_peer_id, vec![*addr], None)
1390                                .await;
1391                        }
1392                        // Update discovery cache with real machine_id
1393                        {
1394                            let mut cache = self.identity_discovery_cache.write().await;
1395                            if let Some(entry) = cache.get_mut(agent_id) {
1396                                entry.machine_id = real_machine_id;
1397                            }
1398                        }
1399                        // Register agent mapping for direct messaging
1400                        self.direct_messaging
1401                            .mark_connected(agent.agent_id, real_machine_id)
1402                            .await;
1403                        let family = if addr.is_ipv4() { "v4" } else { "v6" };
1404                        tracing::info!(
1405                            target: "x0x::connect",
1406                            stage = "connect_to_agent",
1407                            %agent_prefix,
1408                            strategy = "direct_per_addr",
1409                            outcome = "direct",
1410                            selected_addr = %addr,
1411                            family,
1412                            dur_ms = call_start.elapsed().as_millis() as u64,
1413                            "direct dial succeeded"
1414                        );
1415                        return Ok(connectivity::ConnectOutcome::Direct(*addr));
1416                    }
1417                    Ok(Err(e)) => {
1418                        tracing::debug!(
1419                            target: "x0x::connect",
1420                            %agent_prefix,
1421                            strategy = "direct_per_addr",
1422                            %addr,
1423                            error = %e,
1424                            "direct dial failed"
1425                        );
1426                    }
1427                    Err(_) => {
1428                        tracing::debug!(
1429                            target: "x0x::connect",
1430                            %agent_prefix,
1431                            strategy = "direct_per_addr",
1432                            %addr,
1433                            timeout_s = dial_timeout.as_secs(),
1434                            "direct dial timed out"
1435                        );
1436                    }
1437                }
1438            }
1439        }
1440
1441        // 5. If direct failed and coordination may help, use peer-ID dialing
1442        //    with explicit address hints. This lets ant-quic combine the
1443        //    authenticated peer ID with known addresses from x0x discovery /
1444        //    imported cards, unlocking the full direct → hole-punch → relay path.
1445        if info.needs_coordination() || !info.should_attempt_direct() {
1446            // Use the machine_id from discovery cache as the peer_id hint.
1447            // NOTE: This may be a zeroed placeholder if the peer was discovered via
1448            // gossip and hasn't been verified via QUIC handshake yet.
1449            let peer_id_hint = ant_quic::PeerId(agent.machine_id.0);
1450            let hint_was_zeroed = agent.machine_id.0 == [0u8; 32];
1451            self.seed_transport_peer_hints_for_target(network, &agent)
1452                .await
1453                .map_err(|e| {
1454                    error::IdentityError::Storage(std::io::Error::other(format!(
1455                        "failed to seed transport peer hints: {e}"
1456                    )))
1457                })?;
1458            let coordinated_result = tokio::time::timeout(
1459                dial_timeout,
1460                network.connect_peer_with_addrs(peer_id_hint, info.addresses.clone()),
1461            )
1462            .await;
1463            match coordinated_result {
1464                Ok(Ok((addr, verified_peer_id))) => {
1465                    let verified_machine_id = identity::MachineId(verified_peer_id.0);
1466
1467                    // Only update caches if the original hint was not zeroed.
1468                    // When the hint was zeroed, we connected to *some* peer at that address
1469                    // but have no way to verify they are the agent we intended. Writing
1470                    // an unverified peer_id into the caches could corrupt the bootstrap cache
1471                    // with the wrong peer's identity.
1472                    if !hint_was_zeroed {
1473                        if let Some(ref bc) = self.bootstrap_cache {
1474                            bc.add_from_connection(verified_peer_id, vec![addr], None)
1475                                .await;
1476                            bc.record_success(&verified_peer_id, 0).await;
1477                        }
1478                        {
1479                            let mut cache = self.identity_discovery_cache.write().await;
1480                            if let Some(entry) = cache.get_mut(agent_id) {
1481                                entry.machine_id = verified_machine_id;
1482                            }
1483                        }
1484                    }
1485
1486                    // Only register for direct messaging and update caches when the hint
1487                    // was non-zero. When the hint was zeroed, we connected to *some*
1488                    // peer at that address but have no cryptographic way to verify they
1489                    // are the agent we intended. Binding an unverified peer_id to an
1490                    // agent_id could corrupt the direct-messaging registry with the
1491                    // wrong peer's identity.
1492                    if !hint_was_zeroed {
1493                        self.direct_messaging
1494                            .mark_connected(agent.agent_id, verified_machine_id)
1495                            .await;
1496                    }
1497                    let family = if addr.is_ipv4() { "v4" } else { "v6" };
1498                    tracing::info!(
1499                        target: "x0x::connect",
1500                        stage = "connect_to_agent",
1501                        %agent_prefix,
1502                        strategy = "coordinated_fallback",
1503                        outcome = "coordinated",
1504                        selected_addr = %addr,
1505                        family,
1506                        hint_was_zeroed,
1507                        dur_ms = call_start.elapsed().as_millis() as u64,
1508                        "coordinated dial succeeded"
1509                    );
1510                    return Ok(connectivity::ConnectOutcome::Coordinated(addr));
1511                }
1512                Ok(Err(e)) => {
1513                    tracing::debug!(
1514                        target: "x0x::connect",
1515                        %agent_prefix,
1516                        strategy = "coordinated_fallback",
1517                        error = %e,
1518                        "coordinated dial failed"
1519                    );
1520                }
1521                Err(_) => {
1522                    tracing::debug!(
1523                        target: "x0x::connect",
1524                        %agent_prefix,
1525                        strategy = "coordinated_fallback",
1526                        timeout_s = dial_timeout.as_secs(),
1527                        "coordinated dial timed out"
1528                    );
1529                }
1530            }
1531        }
1532
1533        tracing::warn!(
1534            target: "x0x::connect",
1535            stage = "connect_to_agent",
1536            %agent_prefix,
1537            outcome = "unreachable",
1538            reason = "all_strategies_exhausted",
1539            dur_ms = call_start.elapsed().as_millis() as u64,
1540            v4_addrs,
1541            v6_addrs,
1542            "all connection strategies exhausted"
1543        );
1544        Ok(connectivity::ConnectOutcome::Unreachable)
1545    }
1546
1547    /// Save the bootstrap cache and release resources.
1548    ///
1549    /// Call this before dropping the agent to ensure the peer cache is
1550    /// persisted to disk. The background maintenance task saves periodically,
1551    /// but this guarantees a final save.
1552    pub async fn shutdown(&self) {
1553        // Shut down presence beacons.
1554        if let Some(ref pw) = self.presence {
1555            pw.shutdown().await;
1556            tracing::info!("Presence system shut down");
1557        }
1558
1559        if let Some(ref cache) = self.bootstrap_cache {
1560            if let Err(e) = cache.save().await {
1561                tracing::warn!("Failed to save bootstrap cache on shutdown: {e}");
1562            } else {
1563                tracing::info!("Bootstrap cache saved on shutdown");
1564            }
1565        }
1566    }
1567
1568    // === Direct Messaging ===
1569
1570    /// Send data directly to a connected agent.
1571    ///
1572    /// This bypasses gossip pub/sub for efficient point-to-point communication.
1573    /// The agent must be connected first via [`Self::connect_to_agent`].
1574    ///
1575    /// # Arguments
1576    ///
1577    /// * `agent_id` - The target agent's identifier.
1578    /// * `payload` - The data to send.
1579    ///
1580    /// # Errors
1581    ///
1582    /// Returns an error if:
1583    /// - Network is not initialized
1584    /// - Agent is not connected
1585    /// - Agent is not found in discovery cache
1586    /// - Send fails
1587    ///
1588    /// # Example
1589    ///
1590    /// ```rust,ignore
1591    /// // First connect to the agent
1592    /// let outcome = agent.connect_to_agent(&target_agent_id).await?;
1593    ///
1594    /// // Then send data directly
1595    /// agent.send_direct(&target_agent_id, b"hello".to_vec()).await?;
1596    /// ```
1597    /// Send data directly to an agent — capability-aware dispatch.
1598    ///
1599    /// Looks up the recipient's `DmCapabilities` in the local
1600    /// [`dm_capability::CapabilityStore`]. If the recipient advertises
1601    /// `gossip_inbox=true` with a non-empty `kem_public_key`, the send
1602    /// goes via the gossip DM path (signed+encrypted envelope published
1603    /// to the recipient's inbox topic with an application-layer ACK).
1604    /// Otherwise, falls back to the legacy raw-QUIC stream.
1605    ///
1606    /// # Errors
1607    ///
1608    /// See [`dm::DmError`].
1609    pub async fn send_direct(
1610        &self,
1611        to: &identity::AgentId,
1612        payload: Vec<u8>,
1613    ) -> Result<dm::DmReceipt, dm::DmError> {
1614        self.send_direct_with_config(to, payload, dm::DmSendConfig::default())
1615            .await
1616    }
1617
1618    /// Like [`Self::send_direct`] with caller-provided [`dm::DmSendConfig`].
1619    ///
1620    /// # Errors
1621    ///
1622    /// See [`dm::DmError`].
1623    pub async fn send_direct_with_config(
1624        &self,
1625        to: &identity::AgentId,
1626        payload: Vec<u8>,
1627        config: dm::DmSendConfig,
1628    ) -> Result<dm::DmReceipt, dm::DmError> {
1629        let cap = self.capability_store.lookup(to);
1630        let gossip_ok = cap
1631            .as_ref()
1632            .map(|c| c.gossip_inbox && !c.kem_public_key.is_empty())
1633            .unwrap_or(false);
1634
1635        if gossip_ok {
1636            let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1637                dm::DmError::LocalGossipUnavailable(
1638                    "send_direct: no gossip runtime configured".to_string(),
1639                )
1640            })?;
1641            let signing = gossip::SigningContext::from_keypair(self.identity.agent_keypair());
1642            let kem_pub = cap
1643                .as_ref()
1644                .map(|c| c.kem_public_key.clone())
1645                .unwrap_or_default();
1646            return dm_send::send_via_gossip(
1647                std::sync::Arc::clone(runtime.pubsub()),
1648                &signing,
1649                self.identity.agent_id(),
1650                self.identity.machine_id(),
1651                *to,
1652                &kem_pub,
1653                payload,
1654                &config,
1655                std::sync::Arc::clone(&self.dm_inflight_acks),
1656            )
1657            .await;
1658        }
1659
1660        if config.require_gossip {
1661            return Err(dm::DmError::RecipientKeyUnavailable(format!(
1662                "recipient {} has no gossip DM capability advert",
1663                hex::encode(to.as_bytes())
1664            )));
1665        }
1666
1667        self.send_direct_raw_quic(to, payload)
1668            .await
1669            .map(|_| dm_send::raw_quic_receipt())
1670            .map_err(|e| match e {
1671                error::NetworkError::AgentNotFound(_)
1672                | error::NetworkError::AgentNotConnected(_) => {
1673                    dm::DmError::RecipientKeyUnavailable(e.to_string())
1674                }
1675                other => dm::DmError::PublishFailed(other.to_string()),
1676            })
1677    }
1678
1679    /// Legacy raw-QUIC direct-send path. Internal fallback only.
1680    async fn send_direct_raw_quic(
1681        &self,
1682        agent_id: &identity::AgentId,
1683        payload: Vec<u8>,
1684    ) -> error::NetworkResult<()> {
1685        let send_start = std::time::Instant::now();
1686        let agent_prefix = network::hex_prefix(&agent_id.0, 4);
1687        let self_prefix = network::hex_prefix(&self.identity.agent_id().0, 4);
1688        let bytes = payload.len();
1689
1690        let network = self.network.as_ref().ok_or_else(|| {
1691            tracing::warn!(
1692                target: "x0x::direct",
1693                stage = "send",
1694                %agent_prefix,
1695                outcome = "err_no_network",
1696                "network not initialised"
1697            );
1698            error::NetworkError::NodeCreation("network not initialized".to_string())
1699        })?;
1700
1701        // Resolve the best known machine_id, preferring a machine that is
1702        // actually connected right now. Discovery cache entries can lag behind
1703        // the direct-messaging registry when an inbound connection is accepted
1704        // and later reconciled from transport events.
1705        let cached_machine_id = {
1706            let cache = self.identity_discovery_cache.read().await;
1707            cache
1708                .get(agent_id)
1709                .map(|d| d.machine_id)
1710                .filter(|m| m.0 != [0u8; 32]) // Ignore placeholder zeroed IDs
1711        };
1712        let registry_machine_id = self.direct_messaging.get_machine_id(agent_id).await;
1713
1714        let (machine_id, resolution) = match (cached_machine_id, registry_machine_id) {
1715            (Some(id), _) if network.is_connected(&ant_quic::PeerId(id.0)).await => {
1716                (id, "cached_connected")
1717            }
1718            (_, Some(id)) if network.is_connected(&ant_quic::PeerId(id.0)).await => {
1719                if cached_machine_id != Some(id) {
1720                    let mut cache = self.identity_discovery_cache.write().await;
1721                    if let Some(entry) = cache.get_mut(agent_id) {
1722                        entry.machine_id = id;
1723                    }
1724                }
1725                (id, "registry_connected")
1726            }
1727            (Some(id), None) => (id, "cached_not_connected"),
1728            (Some(id), Some(_)) => (id, "cached_both_disconnected"),
1729            (None, Some(id)) => (id, "registry_not_connected"),
1730            (None, None) => {
1731                tracing::debug!(
1732                    target: "x0x::direct",
1733                    stage = "send",
1734                    %agent_prefix,
1735                    resolution = "last_resort_connect",
1736                    "no machine_id known; triggering connect_to_agent"
1737                );
1738                let _ = self.connect_to_agent(agent_id).await;
1739                let id = self
1740                    .direct_messaging
1741                    .get_machine_id(agent_id)
1742                    .await
1743                    .ok_or_else(|| {
1744                        tracing::warn!(
1745                            target: "x0x::direct",
1746                            stage = "send",
1747                            %agent_prefix,
1748                            outcome = "err_agent_not_found",
1749                            dur_ms = send_start.elapsed().as_millis() as u64,
1750                            "no machine_id after connect_to_agent"
1751                        );
1752                        error::NetworkError::AgentNotFound(agent_id.0)
1753                    })?;
1754                (id, "post_connect")
1755            }
1756        };
1757
1758        // Check if connected
1759        let ant_peer_id = ant_quic::PeerId(machine_id.0);
1760        let machine_prefix = network::hex_prefix(&machine_id.0, 4);
1761        if !network.is_connected(&ant_peer_id).await {
1762            tracing::warn!(
1763                target: "x0x::direct",
1764                stage = "send",
1765                %agent_prefix,
1766                %machine_prefix,
1767                resolution,
1768                outcome = "err_not_connected",
1769                bytes,
1770                dur_ms = send_start.elapsed().as_millis() as u64,
1771                "machine_id resolved but peer not currently connected"
1772            );
1773            return Err(error::NetworkError::AgentNotConnected(agent_id.0));
1774        }
1775
1776        // Send via network layer
1777        match network
1778            .send_direct(&ant_peer_id, &self.identity.agent_id().0, &payload)
1779            .await
1780        {
1781            Ok(()) => {
1782                tracing::info!(
1783                    target: "x0x::direct",
1784                    stage = "send",
1785                    from = %self_prefix,
1786                    to = %agent_prefix,
1787                    %machine_prefix,
1788                    resolution,
1789                    bytes,
1790                    dur_ms = send_start.elapsed().as_millis() as u64,
1791                    outcome = "ok",
1792                    "direct message sent"
1793                );
1794                Ok(())
1795            }
1796            Err(e) => {
1797                tracing::warn!(
1798                    target: "x0x::direct",
1799                    stage = "send",
1800                    from = %self_prefix,
1801                    to = %agent_prefix,
1802                    %machine_prefix,
1803                    resolution,
1804                    bytes,
1805                    dur_ms = send_start.elapsed().as_millis() as u64,
1806                    outcome = "err_transport",
1807                    error = %e,
1808                    "transport send_direct failed"
1809                );
1810                Err(e)
1811            }
1812        }
1813    }
1814
1815    /// Receive the next direct message from any connected agent.
1816    ///
1817    /// Blocks until a direct message is received.
1818    ///
1819    /// # Security Note
1820    ///
1821    /// This method does **not** apply trust filtering from `ContactStore`.
1822    /// Messages from blocked agents will still be delivered. Use
1823    /// [`recv_direct_annotated()`](Self::recv_direct_annotated) if you need
1824    /// trust-based filtering.
1825    ///
1826    /// # Returns
1827    ///
1828    /// The received [`DirectMessage`] containing sender, payload, and timestamp.
1829    ///
1830    /// # Example
1831    ///
1832    /// ```rust,ignore
1833    /// loop {
1834    ///     if let Some(msg) = agent.recv_direct().await {
1835    ///         println!("From {:?}: {:?}", msg.sender, msg.payload_str());
1836    ///     }
1837    /// }
1838    /// ```
1839    pub async fn recv_direct(&self) -> Option<direct::DirectMessage> {
1840        self.recv_direct_inner().await
1841    }
1842
1843    /// Receive the next direct message, filtering by trust level.
1844    ///
1845    /// All messages now carry pre-computed `verified` and `trust_decision`
1846    /// fields from the identity discovery cache and contact store. This
1847    /// method passes through all messages — applications should inspect
1848    /// `msg.trust_decision` and `msg.verified` to decide how to handle
1849    /// each message.
1850    ///
1851    /// # Returns
1852    ///
1853    /// The received [`DirectMessage`], or `None` if the channel closes.
1854    ///
1855    /// # Example
1856    ///
1857    /// ```rust,ignore
1858    /// loop {
1859    ///     if let Some(msg) = agent.recv_direct_annotated().await {
1860    ///         match msg.trust_decision {
1861    ///             Some(TrustDecision::RejectBlocked) => continue, // skip
1862    ///             Some(TrustDecision::Accept) if msg.verified => { /* trusted */ }
1863    ///             _ => { /* handle accordingly */ }
1864    ///         }
1865    ///     }
1866    /// }
1867    /// ```
1868    pub async fn recv_direct_annotated(&self) -> Option<direct::DirectMessage> {
1869        self.recv_direct_inner().await
1870    }
1871
1872    /// Internal helper for receiving direct messages.
1873    ///
1874    /// Reads from the `DirectMessaging` internal channel, which is fed by
1875    /// the background `start_direct_listener` task. This ensures there is
1876    /// only ONE consumer of `network.recv_direct()` (the listener), avoiding
1877    /// message-stealing races.
1878    async fn recv_direct_inner(&self) -> Option<direct::DirectMessage> {
1879        self.direct_messaging.recv().await
1880    }
1881
1882    /// Subscribe to direct messages.
1883    ///
1884    /// Returns a receiver that can be cloned for multiple consumers.
1885    /// Messages are broadcast to all receivers.
1886    ///
1887    /// # Example
1888    ///
1889    /// ```rust,ignore
1890    /// let mut rx = agent.subscribe_direct();
1891    /// tokio::spawn(async move {
1892    ///     while let Some(msg) = rx.recv().await {
1893    ///         println!("Direct message: {:?}", msg);
1894    ///     }
1895    /// });
1896    /// ```
1897    pub fn subscribe_direct(&self) -> direct::DirectMessageReceiver {
1898        self.direct_messaging.subscribe()
1899    }
1900
1901    /// Get the direct messaging infrastructure.
1902    ///
1903    /// Provides low-level access to connection tracking and agent mappings.
1904    pub fn direct_messaging(&self) -> &std::sync::Arc<direct::DirectMessaging> {
1905        &self.direct_messaging
1906    }
1907
1908    /// Check if an agent is currently connected for direct messaging.
1909    ///
1910    /// # Arguments
1911    ///
1912    /// * `agent_id` - The agent to check.
1913    ///
1914    /// # Returns
1915    ///
1916    /// `true` if a QUIC connection exists to this agent's machine.
1917    pub async fn is_agent_connected(&self, agent_id: &identity::AgentId) -> bool {
1918        let Some(network) = &self.network else {
1919            return false;
1920        };
1921
1922        // Look up machine_id from discovery cache
1923        let machine_id = {
1924            let cache = self.identity_discovery_cache.read().await;
1925            cache.get(agent_id).map(|d| d.machine_id)
1926        };
1927
1928        match machine_id {
1929            Some(mid) => {
1930                let ant_peer_id = ant_quic::PeerId(mid.0);
1931                network.is_connected(&ant_peer_id).await
1932            }
1933            None => false,
1934        }
1935    }
1936
1937    /// Get list of currently connected agents.
1938    ///
1939    /// Returns agents that have been discovered and are currently connected
1940    /// via QUIC transport.
1941    pub async fn connected_agents(&self) -> Vec<identity::AgentId> {
1942        let Some(network) = &self.network else {
1943            return Vec::new();
1944        };
1945
1946        let connected_peers = network.connected_peers().await;
1947        let cache = self.identity_discovery_cache.read().await;
1948
1949        // Find agents whose machine_id matches a connected peer
1950        cache
1951            .values()
1952            .filter(|agent| {
1953                let ant_peer_id = ant_quic::PeerId(agent.machine_id.0);
1954                connected_peers.contains(&ant_peer_id)
1955            })
1956            .map(|agent| agent.agent_id)
1957            .collect()
1958    }
1959
1960    /// Attach a contact store for trust-based message filtering.
1961    ///
1962    /// When set, the gossip pub/sub layer will:
1963    /// - Drop messages from `Blocked` senders (don't deliver, don't rebroadcast)
1964    /// - Annotate messages with the sender's trust level for consumers
1965    ///
1966    /// Without a contact store, all messages pass through (open relay mode).
1967    pub fn set_contacts(&self, store: std::sync::Arc<tokio::sync::RwLock<contacts::ContactStore>>) {
1968        if let Some(runtime) = &self.gossip_runtime {
1969            runtime.pubsub().set_contacts(store);
1970        }
1971    }
1972
1973    /// Announce this agent's identity on the network discovery topic.
1974    ///
1975    /// By default, announcements include agent + machine identity only.
1976    /// Human identity disclosure is opt-in and requires explicit consent.
1977    ///
1978    /// # Arguments
1979    ///
1980    /// * `include_user_identity` - Whether to include `user_id` and certificate
1981    /// * `human_consent` - Must be `true` when disclosing user identity
1982    ///
1983    /// # Errors
1984    ///
1985    /// Returns an error if:
1986    /// - Gossip runtime is not initialized
1987    /// - Human identity disclosure is requested without explicit consent
1988    /// - Human identity disclosure is requested but no user identity is configured
1989    /// - Serialization or publish fails
1990    pub async fn announce_identity(
1991        &self,
1992        include_user_identity: bool,
1993        human_consent: bool,
1994    ) -> error::Result<()> {
1995        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
1996            error::IdentityError::Storage(std::io::Error::other(
1997                "gossip runtime not initialized - configure agent with network first",
1998            ))
1999        })?;
2000
2001        self.start_identity_listener().await?;
2002
2003        // Include ALL routable addresses (IPv4 and IPv6).
2004        let mut addresses = if let Some(network) = self.network.as_ref() {
2005            match network.node_status().await {
2006                Some(status) if !status.external_addrs.is_empty() => status.external_addrs,
2007                _ => match network.routable_addr().await {
2008                    Some(addr) => vec![addr],
2009                    None => self.announcement_addresses(),
2010                },
2011            }
2012        } else {
2013            self.announcement_addresses()
2014        };
2015        // Detect addresses locally via UDP socket tricks.
2016        // ant-quic discovers public IPv4 via OBSERVED_ADDRESS from peers.
2017        // IPv6 is globally routable (no NAT), so we probe locally.
2018        //
2019        // For locally-probed addresses (IPv6 and LAN IPv4), use the actual
2020        // bound port from the QUIC endpoint — NOT the first external address
2021        // port (which is NAT-mapped) and NOT the config bind port (which may
2022        // be 0 for OS-assigned ports).
2023        let bind_port = if let Some(network) = self.network.as_ref() {
2024            network.bound_addr().await.map(|a| a.port()).unwrap_or(5483)
2025        } else {
2026            5483
2027        };
2028
2029        // IPv6 probe
2030        if let Ok(sock) = std::net::UdpSocket::bind("[::]:0") {
2031            if sock.connect("[2001:4860:4860::8888]:80").is_ok() {
2032                if let Ok(local) = sock.local_addr() {
2033                    if let std::net::IpAddr::V6(v6) = local.ip() {
2034                        let segs = v6.segments();
2035                        let is_global = (segs[0] & 0xffc0) != 0xfe80
2036                            && (segs[0] & 0xff00) != 0xfd00
2037                            && !v6.is_loopback();
2038                        if is_global {
2039                            let v6_addr =
2040                                std::net::SocketAddr::new(std::net::IpAddr::V6(v6), bind_port);
2041                            if !addresses.contains(&v6_addr) {
2042                                addresses.push(v6_addr);
2043                            }
2044                        }
2045                    }
2046                }
2047            }
2048        }
2049
2050        for addr in collect_local_interface_addrs(bind_port) {
2051            if !addresses.contains(&addr) {
2052                addresses.push(addr);
2053            }
2054        }
2055
2056        // Same rule as HeartbeatContext::announce() — gossip is global, so
2057        // LAN-scope addresses must never be published here. See the scope
2058        // analysis report under tests/proof-reports/ for details.
2059        addresses.retain(|a| is_publicly_advertisable(*a));
2060
2061        let announcement = self.build_identity_announcement_with_addrs(
2062            include_user_identity,
2063            human_consent,
2064            addresses,
2065        )?;
2066
2067        let encoded = bincode::serialize(&announcement).map_err(|e| {
2068            error::IdentityError::Serialization(format!(
2069                "failed to serialize identity announcement: {e}"
2070            ))
2071        })?;
2072
2073        let payload = bytes::Bytes::from(encoded);
2074
2075        // Publish to shard topic first (future-proof routing).
2076        let shard_topic = shard_topic_for_agent(&announcement.agent_id);
2077        runtime
2078            .pubsub()
2079            .publish(shard_topic, payload.clone())
2080            .await
2081            .map_err(|e| {
2082                error::IdentityError::Storage(std::io::Error::other(format!(
2083                    "failed to publish identity announcement to shard topic: {e}"
2084                )))
2085            })?;
2086
2087        // Also publish to legacy broadcast topic for backward compatibility.
2088        runtime
2089            .pubsub()
2090            .publish(IDENTITY_ANNOUNCE_TOPIC.to_string(), payload)
2091            .await
2092            .map_err(|e| {
2093                error::IdentityError::Storage(std::io::Error::other(format!(
2094                    "failed to publish identity announcement: {e}"
2095                )))
2096            })?;
2097
2098        let now = Self::unix_timestamp_secs();
2099        self.identity_discovery_cache.write().await.insert(
2100            announcement.agent_id,
2101            DiscoveredAgent {
2102                agent_id: announcement.agent_id,
2103                machine_id: announcement.machine_id,
2104                user_id: announcement.user_id,
2105                addresses: announcement.addresses.clone(),
2106                announced_at: announcement.announced_at,
2107                last_seen: now,
2108                machine_public_key: announcement.machine_public_key.clone(),
2109                nat_type: announcement.nat_type.clone(),
2110                can_receive_direct: announcement.can_receive_direct,
2111                is_relay: announcement.is_relay,
2112                is_coordinator: announcement.is_coordinator,
2113            },
2114        );
2115
2116        // Record consent AFTER successful publish so heartbeats don't start
2117        // including user identity if this announcement never actually propagated.
2118        if include_user_identity && human_consent {
2119            self.user_identity_consented
2120                .store(true, std::sync::atomic::Ordering::Release);
2121        }
2122
2123        Ok(())
2124    }
2125
2126    /// Get all discovered agents from identity announcements.
2127    ///
2128    /// # Errors
2129    ///
2130    /// Returns an error if the gossip runtime is not initialized.
2131    pub async fn discovered_agents(&self) -> error::Result<Vec<DiscoveredAgent>> {
2132        self.start_identity_listener().await?;
2133        let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
2134        let mut agents: Vec<_> = self
2135            .identity_discovery_cache
2136            .read()
2137            .await
2138            .values()
2139            .filter(|a| a.announced_at >= cutoff)
2140            .cloned()
2141            .collect();
2142        agents.sort_by_key(|a| a.agent_id.0);
2143        Ok(agents)
2144    }
2145
2146    /// Return all discovered agents regardless of TTL.
2147    ///
2148    /// Unlike [`Self::discovered_agents`], this method skips TTL filtering and
2149    /// returns all cache entries, including stale ones. Useful for debugging.
2150    ///
2151    /// # Errors
2152    ///
2153    /// Returns an error if the gossip runtime is not initialized.
2154    pub async fn discovered_agents_unfiltered(&self) -> error::Result<Vec<DiscoveredAgent>> {
2155        self.start_identity_listener().await?;
2156        let mut agents: Vec<_> = self
2157            .identity_discovery_cache
2158            .read()
2159            .await
2160            .values()
2161            .cloned()
2162            .collect();
2163        agents.sort_by_key(|a| a.agent_id.0);
2164        Ok(agents)
2165    }
2166
2167    /// Get one discovered agent record by agent ID.
2168    ///
2169    /// # Errors
2170    ///
2171    /// Returns an error if the gossip runtime is not initialized.
2172    pub async fn discovered_agent(
2173        &self,
2174        agent_id: identity::AgentId,
2175    ) -> error::Result<Option<DiscoveredAgent>> {
2176        self.start_identity_listener().await?;
2177        Ok(self
2178            .identity_discovery_cache
2179            .read()
2180            .await
2181            .get(&agent_id)
2182            .cloned())
2183    }
2184
2185    async fn start_identity_listener(&self) -> error::Result<()> {
2186        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2187            error::IdentityError::Storage(std::io::Error::other(
2188                "gossip runtime not initialized - configure agent with network first",
2189            ))
2190        })?;
2191
2192        if self
2193            .identity_listener_started
2194            .swap(true, std::sync::atomic::Ordering::AcqRel)
2195        {
2196            return Ok(());
2197        }
2198
2199        let mut sub_legacy = runtime
2200            .pubsub()
2201            .subscribe(IDENTITY_ANNOUNCE_TOPIC.to_string())
2202            .await;
2203        let own_shard_topic = shard_topic_for_agent(&self.agent_id());
2204        let mut sub_shard = runtime.pubsub().subscribe(own_shard_topic).await;
2205        let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
2206        let bootstrap_cache = self.bootstrap_cache.clone();
2207        let contact_store = std::sync::Arc::clone(&self.contact_store);
2208        let direct_messaging = std::sync::Arc::clone(&self.direct_messaging);
2209        let network = self.network.as_ref().map(std::sync::Arc::clone);
2210        let own_agent_id = self.agent_id();
2211        let rebroadcast_pubsub = std::sync::Arc::clone(runtime.pubsub());
2212
2213        tokio::spawn(async move {
2214            // Track agents we've already initiated auto-connect to, preventing
2215            // duplicate connection attempts from concurrent announcements.
2216            let mut auto_connect_attempted = std::collections::HashSet::<identity::AgentId>::new();
2217
2218            // Time-windowed dedup for re-broadcast: (agent_id, announced_at)
2219            // → last-rebroadcast Instant. Bounds how often we re-emit any
2220            // given announcement so a busy mesh doesn't amplify gossip
2221            // indefinitely. Mirrors the pattern used by the release-manifest
2222            // re-broadcast loop in x0xd.
2223            let mut rebroadcast_state: std::collections::HashMap<
2224                (identity::AgentId, u64),
2225                std::time::Instant,
2226            > = std::collections::HashMap::new();
2227            const REBROADCAST_MIN_INTERVAL: std::time::Duration =
2228                std::time::Duration::from_secs(20);
2229
2230            loop {
2231                // Drain whichever subscription fires next; deduplicate by AgentId in cache.
2232                let msg = tokio::select! {
2233                    Some(m) = sub_legacy.recv() => m,
2234                    Some(m) = sub_shard.recv() => m,
2235                    else => break,
2236                };
2237                let decoded = {
2238                    use bincode::Options;
2239                    bincode::options()
2240                        .with_fixint_encoding()
2241                        .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
2242                        .allow_trailing_bytes()
2243                        .deserialize::<IdentityAnnouncement>(&msg.payload)
2244                };
2245                let raw_payload = msg.payload.clone();
2246                let announcement = match decoded {
2247                    Ok(a) => a,
2248                    Err(e) => {
2249                        tracing::debug!("Ignoring invalid identity announcement payload: {}", e);
2250                        continue;
2251                    }
2252                };
2253
2254                if let Err(e) = announcement.verify() {
2255                    tracing::warn!("Ignoring unverifiable identity announcement: {}", e);
2256                    continue;
2257                }
2258
2259                // Evaluate trust for this (agent, machine) pair.
2260                // Blocked or machine-pinning violations are silently dropped.
2261                {
2262                    let store = contact_store.read().await;
2263                    let evaluator = trust::TrustEvaluator::new(&store);
2264                    let decision = evaluator.evaluate(&trust::TrustContext {
2265                        agent_id: &announcement.agent_id,
2266                        machine_id: &announcement.machine_id,
2267                    });
2268                    match decision {
2269                        trust::TrustDecision::RejectBlocked => {
2270                            tracing::debug!(
2271                                "Dropping identity announcement from blocked agent {:?}",
2272                                hex::encode(&announcement.agent_id.0[..8]),
2273                            );
2274                            continue;
2275                        }
2276                        trust::TrustDecision::RejectMachineMismatch => {
2277                            tracing::warn!(
2278                                "Dropping identity announcement from agent {:?}: machine {:?} not in pinned list",
2279                                hex::encode(&announcement.agent_id.0[..8]),
2280                                hex::encode(&announcement.machine_id.0[..8]),
2281                            );
2282                            continue;
2283                        }
2284                        _ => {}
2285                    }
2286                }
2287
2288                // Update machine records in the contact store.
2289                {
2290                    let mut store = contact_store.write().await;
2291                    let record = contacts::MachineRecord::new(announcement.machine_id, None);
2292                    store.add_machine(&announcement.agent_id, record);
2293                }
2294
2295                let now = std::time::SystemTime::now()
2296                    .duration_since(std::time::UNIX_EPOCH)
2297                    .map_or(0, |d| d.as_secs());
2298
2299                // Add only globally-routable addresses to the persistent
2300                // bootstrap cache. Private/LAN addresses are kept in the
2301                // ephemeral discovery cache (below) for same-network
2302                // connectivity, but must not persist across restarts where
2303                // they become stale dead-ends for remote nodes.
2304                {
2305                    let public_addrs: Vec<std::net::SocketAddr> = announcement
2306                        .addresses
2307                        .iter()
2308                        .copied()
2309                        .filter(|a| is_globally_routable(a.ip()))
2310                        .collect();
2311                    if !public_addrs.is_empty() {
2312                        if let Some(ref bc) = &bootstrap_cache {
2313                            let peer_id = ant_quic::PeerId(announcement.machine_id.0);
2314                            bc.add_from_connection(peer_id, public_addrs.clone(), None)
2315                                .await;
2316                            tracing::debug!(
2317                                "Added {} public addresses to bootstrap cache for agent {:?} (machine {:?})",
2318                                public_addrs.len(),
2319                                announcement.agent_id,
2320                                hex::encode(&announcement.machine_id.0[..8]),
2321                            );
2322                        }
2323                    }
2324                }
2325
2326                // Cache the announcement with its address list filtered to
2327                // globally-advertisable scope only. Legacy peers that still
2328                // ship RFC1918/ULA/loopback entries in their announcements
2329                // must not force us to keep dialing their unreachable LAN
2330                // addresses — LAN discovery is ant-quic's mDNS job. Empty
2331                // address lists are preserved (the `AlreadyConnected` path in
2332                // `connect_to_agent` handles gossip peers we only reach by
2333                // an existing QUIC connection).
2334                let filtered_addresses: Vec<std::net::SocketAddr> = announcement
2335                    .addresses
2336                    .iter()
2337                    .copied()
2338                    .filter(|a| is_publicly_advertisable(*a))
2339                    .collect();
2340                cache.write().await.insert(
2341                    announcement.agent_id,
2342                    DiscoveredAgent {
2343                        agent_id: announcement.agent_id,
2344                        machine_id: announcement.machine_id,
2345                        user_id: announcement.user_id,
2346                        addresses: filtered_addresses,
2347                        announced_at: announcement.announced_at,
2348                        last_seen: now,
2349                        machine_public_key: announcement.machine_public_key.clone(),
2350                        nat_type: announcement.nat_type.clone(),
2351                        can_receive_direct: announcement.can_receive_direct,
2352                        is_relay: announcement.is_relay,
2353                        is_coordinator: announcement.is_coordinator,
2354                    },
2355                );
2356
2357                // Identity announcements are the strongest agent↔machine binding we have.
2358                // Register the mapping immediately so reverse direct-send can resolve the
2359                // machine even before the first inbound direct payload arrives.
2360                direct_messaging
2361                    .register_agent(announcement.agent_id, announcement.machine_id)
2362                    .await;
2363
2364                // Epidemic re-broadcast — mirrors the release-manifest
2365                // re-broadcast pattern. Bootstrap-node meshes have patchy
2366                // PlumTree overlap for the identity-announce topic: the
2367                // origin's tree only reaches 1–2 hops reliably. Making
2368                // every verified recipient re-publish guarantees flood
2369                // convergence across the mesh. Dedup-window on
2370                // (agent_id, announced_at) bounds amplification. Pub/Sub
2371                // v2 re-signs each publish with a new message ID so
2372                // PlumTree's own dedup cannot suppress our forward.
2373                if announcement.agent_id != own_agent_id {
2374                    let key = (announcement.agent_id, announcement.announced_at);
2375                    let should_forward = match rebroadcast_state.get(&key) {
2376                        None => true,
2377                        Some(last) => last.elapsed() >= REBROADCAST_MIN_INTERVAL,
2378                    };
2379                    if should_forward {
2380                        rebroadcast_state.insert(key, std::time::Instant::now());
2381                        // Prune stale entries to cap memory.
2382                        if rebroadcast_state.len() > 1024 {
2383                            let cutoff =
2384                                std::time::Instant::now() - std::time::Duration::from_secs(3600);
2385                            rebroadcast_state.retain(|_, t| *t >= cutoff);
2386                        }
2387                        let pubsub = std::sync::Arc::clone(&rebroadcast_pubsub);
2388                        let payload = raw_payload.clone();
2389                        tokio::spawn(async move {
2390                            if let Err(e) = pubsub
2391                                .publish(IDENTITY_ANNOUNCE_TOPIC.to_string(), payload)
2392                                .await
2393                            {
2394                                tracing::debug!("identity announcement re-broadcast failed: {e}");
2395                            }
2396                        });
2397                    }
2398                }
2399
2400                // Reconcile the agent-level direct-message registry if the transport peer
2401                // is already connected (for example an inbound accept that happened before
2402                // this announcement reached us).
2403                if let Some(ref net) = &network {
2404                    let ant_peer_id = ant_quic::PeerId(announcement.machine_id.0);
2405                    if net.is_connected(&ant_peer_id).await {
2406                        direct_messaging
2407                            .mark_connected(announcement.agent_id, announcement.machine_id)
2408                            .await;
2409                    }
2410                }
2411
2412                // Auto-connect to discovered agents so pub/sub messages can route
2413                // between peers that share bootstrap nodes but aren't directly connected.
2414                // The gossip topology refresh (every 1s) will add the new peer to
2415                // PlumTree topic trees once the QUIC connection is established.
2416                if announcement.agent_id != own_agent_id
2417                    && !announcement.addresses.is_empty()
2418                    && !auto_connect_attempted.contains(&announcement.agent_id)
2419                {
2420                    if let Some(ref net) = &network {
2421                        let ant_peer = ant_quic::PeerId(announcement.machine_id.0);
2422                        if !net.is_connected(&ant_peer).await {
2423                            auto_connect_attempted.insert(announcement.agent_id);
2424                            let net = std::sync::Arc::clone(net);
2425                            let addresses = announcement.addresses.clone();
2426                            tokio::spawn(async move {
2427                                for addr in &addresses {
2428                                    match net.connect_addr(*addr).await {
2429                                        Ok(_) => {
2430                                            tracing::info!(
2431                                                "Auto-connected to discovered agent at {addr}",
2432                                            );
2433                                            return;
2434                                        }
2435                                        Err(e) => {
2436                                            tracing::debug!("Auto-connect to {addr} failed: {e}",);
2437                                        }
2438                                    }
2439                                }
2440                                tracing::debug!(
2441                                    "Auto-connect exhausted all {} addresses for discovered agent",
2442                                    addresses.len(),
2443                                );
2444                            });
2445                        }
2446                    }
2447                }
2448            }
2449        });
2450
2451        Ok(())
2452    }
2453
2454    fn unix_timestamp_secs() -> u64 {
2455        std::time::SystemTime::now()
2456            .duration_since(std::time::UNIX_EPOCH)
2457            .map_or(0, |d| d.as_secs())
2458    }
2459
2460    fn announcement_addresses(&self) -> Vec<std::net::SocketAddr> {
2461        match self.network.as_ref().and_then(|n| n.local_addr()) {
2462            Some(addr) if addr.port() > 0 => collect_local_interface_addrs(addr.port()),
2463            _ => Vec::new(),
2464        }
2465    }
2466
2467    fn build_identity_announcement(
2468        &self,
2469        include_user_identity: bool,
2470        human_consent: bool,
2471    ) -> error::Result<IdentityAnnouncement> {
2472        self.build_identity_announcement_with_addrs(
2473            include_user_identity,
2474            human_consent,
2475            self.announcement_addresses(),
2476        )
2477    }
2478
2479    fn build_identity_announcement_with_addrs(
2480        &self,
2481        include_user_identity: bool,
2482        human_consent: bool,
2483        addresses: Vec<std::net::SocketAddr>,
2484    ) -> error::Result<IdentityAnnouncement> {
2485        if include_user_identity && !human_consent {
2486            return Err(error::IdentityError::Storage(std::io::Error::other(
2487                "human identity disclosure requires explicit human consent — set human_consent: true in the request body",
2488            )));
2489        }
2490
2491        let (user_id, agent_certificate) = if include_user_identity {
2492            let user_id = self.user_id().ok_or_else(|| {
2493                error::IdentityError::Storage(std::io::Error::other(
2494                    "human identity disclosure requested but no user identity is configured — set user_key_path in your config.toml to point at your user keypair file",
2495                ))
2496            })?;
2497            let cert = self.agent_certificate().cloned().ok_or_else(|| {
2498                error::IdentityError::Storage(std::io::Error::other(
2499                    "human identity disclosure requested but agent certificate is missing",
2500                ))
2501            })?;
2502            (Some(user_id), Some(cert))
2503        } else {
2504            (None, None)
2505        };
2506
2507        let machine_public_key = self
2508            .identity
2509            .machine_keypair()
2510            .public_key()
2511            .as_bytes()
2512            .to_vec();
2513
2514        // NAT status is populated by the heartbeat loop (which is async and has
2515        // access to NodeStatus). Here we use None for the NAT fields as this
2516        // sync builder doesn't have async access to the network layer.
2517        let unsigned = IdentityAnnouncementUnsigned {
2518            agent_id: self.agent_id(),
2519            machine_id: self.machine_id(),
2520            user_id,
2521            agent_certificate: agent_certificate.clone(),
2522            machine_public_key: machine_public_key.clone(),
2523            addresses,
2524            announced_at: Self::unix_timestamp_secs(),
2525            nat_type: None,
2526            can_receive_direct: None,
2527            is_relay: None,
2528            is_coordinator: None,
2529        };
2530        let unsigned_bytes = bincode::serialize(&unsigned).map_err(|e| {
2531            error::IdentityError::Serialization(format!(
2532                "failed to serialize unsigned identity announcement: {e}"
2533            ))
2534        })?;
2535        let machine_signature = ant_quic::crypto::raw_public_keys::pqc::sign_with_ml_dsa(
2536            self.identity.machine_keypair().secret_key(),
2537            &unsigned_bytes,
2538        )
2539        .map_err(|e| {
2540            error::IdentityError::Storage(std::io::Error::other(format!(
2541                "failed to sign identity announcement with machine key: {:?}",
2542                e
2543            )))
2544        })?
2545        .as_bytes()
2546        .to_vec();
2547
2548        Ok(IdentityAnnouncement {
2549            agent_id: unsigned.agent_id,
2550            machine_id: unsigned.machine_id,
2551            user_id: unsigned.user_id,
2552            agent_certificate: unsigned.agent_certificate,
2553            machine_public_key,
2554            machine_signature,
2555            addresses: unsigned.addresses,
2556            announced_at: unsigned.announced_at,
2557            nat_type: unsigned.nat_type,
2558            can_receive_direct: unsigned.can_receive_direct,
2559            is_relay: unsigned.is_relay,
2560            is_coordinator: unsigned.is_coordinator,
2561        })
2562    }
2563
2564    /// Join the x0x gossip network.
2565    ///
2566    /// Connects to bootstrap peers in parallel with automatic retries.
2567    /// Failed connections are retried after a delay to allow stale
2568    /// connections on remote nodes to expire.
2569    ///
2570    /// If the agent was not configured with a network, this method
2571    /// succeeds gracefully (nothing to join).
2572    pub async fn join_network(&self) -> error::Result<()> {
2573        let Some(network) = self.network.as_ref() else {
2574            tracing::debug!("join_network called but no network configured");
2575            return Ok(());
2576        };
2577
2578        if let Some(ref runtime) = self.gossip_runtime {
2579            runtime.start().await.map_err(|e| {
2580                error::IdentityError::Storage(std::io::Error::other(format!(
2581                    "failed to start gossip runtime: {e}"
2582                )))
2583            })?;
2584            tracing::info!("Gossip runtime started");
2585        }
2586        self.start_identity_listener().await?;
2587        self.start_network_event_listener();
2588        self.start_direct_listener();
2589
2590        let bootstrap_nodes = network.config().bootstrap_nodes.clone();
2591
2592        let min_connected = 3;
2593        let mut all_connected: Vec<std::net::SocketAddr> = Vec::new();
2594
2595        // ant-quic now owns first-party mDNS discovery and auto-connect.
2596        // x0x keeps bootstrap/cache orchestration here, while the transport
2597        // layer handles zero-config LAN discovery internally.
2598
2599        // Phase 0: Try quality-scored coordinator peers from bootstrap cache.
2600        // The bootstrap cache learns about coordinator-capable peers passively
2601        // through normal connections — no coordinator gossip topic needed.
2602        if let Some(ref cache) = self.bootstrap_cache {
2603            let coordinators = cache.select_coordinators(6).await;
2604            let coordinator_addrs: Vec<std::net::SocketAddr> = coordinators
2605                .iter()
2606                .flat_map(|peer| peer.preferred_addresses())
2607                .collect();
2608
2609            if !coordinator_addrs.is_empty() {
2610                tracing::info!(
2611                    "Phase 0: Trying {} addresses from {} cached coordinators",
2612                    coordinator_addrs.len(),
2613                    coordinators.len()
2614                );
2615                let (succeeded, _failed) = self
2616                    .connect_peers_parallel_tracked(network, &coordinator_addrs)
2617                    .await;
2618                all_connected.extend(&succeeded);
2619                tracing::info!(
2620                    "Phase 0: {}/{} coordinator addresses connected",
2621                    succeeded.len(),
2622                    coordinator_addrs.len()
2623                );
2624            }
2625        }
2626
2627        // Phase 1: Try cached peers first using the real ant-quic peer IDs.
2628        if all_connected.len() < min_connected {
2629            if let Some(ref cache) = self.bootstrap_cache {
2630                const PHASE1_PEER_CANDIDATES: usize = 12;
2631                let cached_peers = cache.select_peers(PHASE1_PEER_CANDIDATES).await;
2632                if !cached_peers.is_empty() {
2633                    tracing::info!("Phase 1: Trying {} cached peers", cached_peers.len());
2634                    let (succeeded, _failed) = self
2635                        .connect_cached_peers_parallel_tracked(network, &cached_peers)
2636                        .await;
2637                    all_connected.extend(&succeeded);
2638                    tracing::info!(
2639                        "Phase 1: {}/{} cached peers connected",
2640                        succeeded.len(),
2641                        cached_peers.len()
2642                    );
2643                }
2644            }
2645        } // end Phase 1 min_connected check
2646
2647        // Phase 2: Connect to hardcoded bootstrap nodes if we need more peers.
2648        // This is the fallback for when coordinator cache and cached peers aren't enough.
2649        if all_connected.len() < min_connected && !bootstrap_nodes.is_empty() {
2650            let remaining: Vec<std::net::SocketAddr> = bootstrap_nodes
2651                .iter()
2652                .filter(|addr| !all_connected.contains(addr))
2653                .copied()
2654                .collect();
2655
2656            // Round 1: Connect to all bootstrap peers in parallel
2657            let (succeeded, mut failed) = self
2658                .connect_peers_parallel_tracked(network, &remaining)
2659                .await;
2660            all_connected.extend(&succeeded);
2661            tracing::info!(
2662                "Phase 2 round 1: {}/{} bootstrap peers connected",
2663                succeeded.len(),
2664                remaining.len()
2665            );
2666
2667            // Retry rounds for failed peers
2668            for round in 2..=3 {
2669                if failed.is_empty() {
2670                    break;
2671                }
2672                let delay = std::time::Duration::from_secs(if round == 2 { 10 } else { 15 });
2673                tracing::info!(
2674                    "Retrying {} failed peers in {}s (round {})",
2675                    failed.len(),
2676                    delay.as_secs(),
2677                    round
2678                );
2679                tokio::time::sleep(delay).await;
2680
2681                let (succeeded, still_failed) =
2682                    self.connect_peers_parallel_tracked(network, &failed).await;
2683                all_connected.extend(&succeeded);
2684                failed = still_failed;
2685                tracing::info!(
2686                    "Phase 2 round {}: {} total peers connected",
2687                    round,
2688                    all_connected.len()
2689                );
2690            }
2691
2692            if !failed.is_empty() {
2693                tracing::warn!(
2694                    "Could not connect to {} bootstrap peers: {:?}",
2695                    failed.len(),
2696                    failed
2697                );
2698            }
2699        }
2700
2701        tracing::info!(
2702            "Network join complete. Connected to {} peers.",
2703            all_connected.len()
2704        );
2705
2706        // Join the HyParView membership overlay via connected peers.
2707        if let Some(ref runtime) = self.gossip_runtime {
2708            let seeds: Vec<String> = all_connected.iter().map(|addr| addr.to_string()).collect();
2709            if !seeds.is_empty() {
2710                if let Err(e) = runtime.membership().join(seeds).await {
2711                    tracing::warn!("HyParView membership join failed: {e}");
2712                }
2713            }
2714        }
2715
2716        // Start presence beacons after membership overlay is established.
2717        if let Some(ref pw) = self.presence {
2718            // Seed broadcast peers from connected peers so beacons propagate.
2719            if let Some(ref runtime) = self.gossip_runtime {
2720                let active = runtime.membership().active_view();
2721                for peer in active {
2722                    pw.manager().add_broadcast_peer(peer).await;
2723                }
2724                tracing::info!(
2725                    "Presence seeded with {} broadcast peers",
2726                    pw.manager().broadcast_peer_count().await
2727                );
2728            }
2729
2730            // Populate address hints from network status for beacon metadata.
2731            //
2732            // Presence beacons propagate over global gossip — filter to only
2733            // globally-advertisable addresses. LAN discovery is ant-quic's
2734            // mDNS job, not ours. Also exclude `status.local_addr` which is
2735            // typically the wildcard `[::]:5483` and never a dialable target.
2736            if let Some(ref net) = self.network {
2737                if let Some(status) = net.node_status().await {
2738                    let hints: Vec<String> = status
2739                        .external_addrs
2740                        .iter()
2741                        .filter(|a| is_publicly_advertisable(**a))
2742                        .map(|a| a.to_string())
2743                        .collect();
2744                    pw.manager().set_addr_hints(hints).await;
2745                }
2746            }
2747
2748            if pw.config().enable_beacons {
2749                if let Err(e) = pw
2750                    .manager()
2751                    .start_beacons(pw.config().beacon_interval_secs)
2752                    .await
2753                {
2754                    tracing::warn!("Failed to start presence beacons: {e}");
2755                } else {
2756                    tracing::info!(
2757                        "Presence beacons started (interval={}s)",
2758                        pw.config().beacon_interval_secs
2759                    );
2760                }
2761            }
2762
2763            // Start the presence event-emission loop so that subscribers
2764            // automatically receive AgentOnline/AgentOffline events after
2765            // join_network() returns.
2766            pw.start_event_loop(std::sync::Arc::clone(&self.identity_discovery_cache))
2767                .await;
2768            tracing::debug!("Presence event loop started");
2769        }
2770
2771        if let Err(e) = self.announce_identity(false, false).await {
2772            tracing::warn!("Initial identity announcement failed: {}", e);
2773        }
2774        if let Err(e) = self.start_identity_heartbeat().await {
2775            tracing::warn!("Failed to start identity heartbeat: {e}");
2776        }
2777
2778        // Schedule a fresh re-announcement after gossip topology stabilizes.
2779        // The initial publish fires before PlumTree has formed eager-push links,
2780        // so peers that connected after the first announce won't see it.
2781        // A fresh announcement (new message ID) is required because PlumTree
2782        // deduplicates by message ID — replaying identical bytes would be silently
2783        // dropped by peers that already received the first announcement.
2784        if let (Some(ref runtime), Some(ref network)) = (&self.gossip_runtime, &self.network) {
2785            let ctx = HeartbeatContext {
2786                identity: std::sync::Arc::clone(&self.identity),
2787                runtime: std::sync::Arc::clone(runtime),
2788                network: std::sync::Arc::clone(network),
2789                interval_secs: self.heartbeat_interval_secs,
2790                cache: std::sync::Arc::clone(&self.identity_discovery_cache),
2791                user_identity_consented: std::sync::Arc::clone(&self.user_identity_consented),
2792            };
2793            tokio::spawn(async move {
2794                tokio::time::sleep(std::time::Duration::from_secs(3)).await;
2795                if let Err(e) = ctx.announce().await {
2796                    tracing::warn!("Delayed identity re-announcement failed: {e}");
2797                } else {
2798                    tracing::info!(
2799                        "Delayed identity re-announcement sent (gossip mesh stabilized)"
2800                    );
2801                }
2802            });
2803        }
2804
2805        // Start the capability advert service. Until start_dm_inbox runs,
2806        // the advert declares gossip_inbox=false — senders fall back to
2807        // raw-QUIC. Once upgraded, peers learn about gossip DM support.
2808        if let Some(ref runtime) = self.gossip_runtime {
2809            let signing = std::sync::Arc::new(gossip::SigningContext::from_keypair(
2810                self.identity.agent_keypair(),
2811            ));
2812            let caps_rx = self.dm_capabilities_tx.subscribe();
2813            match dm_capability_service::CapabilityAdvertService::spawn_default(
2814                std::sync::Arc::clone(runtime.pubsub()),
2815                signing,
2816                self.identity.agent_id(),
2817                self.identity.machine_id(),
2818                caps_rx,
2819                std::sync::Arc::clone(&self.capability_store),
2820            )
2821            .await
2822            {
2823                Ok(service) => {
2824                    let mut guard = self.capability_advert_service.lock().await;
2825                    if let Some(prev) = guard.take() {
2826                        prev.abort();
2827                    }
2828                    *guard = Some(service);
2829                    tracing::info!("Capability advert service started");
2830                }
2831                Err(e) => tracing::warn!("failed to start capability advert service: {e}"),
2832            }
2833        }
2834
2835        Ok(())
2836    }
2837
2838    /// Clone the shared capability store.
2839    #[must_use]
2840    pub fn capability_store(&self) -> std::sync::Arc<dm_capability::CapabilityStore> {
2841        std::sync::Arc::clone(&self.capability_store)
2842    }
2843
2844    /// Clone the shared DM in-flight ACK registry.
2845    #[must_use]
2846    pub fn dm_inflight_acks(&self) -> std::sync::Arc<dm::InFlightAcks> {
2847        std::sync::Arc::clone(&self.dm_inflight_acks)
2848    }
2849
2850    /// Clone the shared recent-delivery dedupe cache.
2851    #[must_use]
2852    pub fn recent_delivery_cache(&self) -> std::sync::Arc<dm::RecentDeliveryCache> {
2853        std::sync::Arc::clone(&self.recent_delivery_cache)
2854    }
2855
2856    /// Spawn the DM inbox service backed by the given KEM keypair.
2857    /// Idempotent — the prior service is aborted before spawning new.
2858    ///
2859    /// # Errors
2860    ///
2861    /// Returns an error if no gossip runtime is configured.
2862    pub async fn start_dm_inbox(
2863        &self,
2864        kem_keypair: std::sync::Arc<groups::kem_envelope::AgentKemKeypair>,
2865        config: dm_inbox::DmInboxConfig,
2866    ) -> error::Result<()> {
2867        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
2868            error::IdentityError::Storage(std::io::Error::other(
2869                "cannot start DM inbox: no gossip runtime configured",
2870            ))
2871        })?;
2872        let signing = std::sync::Arc::new(gossip::SigningContext::from_keypair(
2873            self.identity.agent_keypair(),
2874        ));
2875        let service = dm_inbox::DmInboxService::spawn(
2876            std::sync::Arc::clone(runtime.pubsub()),
2877            signing,
2878            self.identity.agent_id(),
2879            self.identity.machine_id(),
2880            std::sync::Arc::clone(&kem_keypair),
2881            std::sync::Arc::clone(&self.direct_messaging),
2882            std::sync::Arc::clone(&self.contact_store),
2883            std::sync::Arc::clone(&self.dm_inflight_acks),
2884            std::sync::Arc::clone(&self.recent_delivery_cache),
2885            config,
2886        )
2887        .await
2888        .map_err(|e| {
2889            error::IdentityError::Storage(std::io::Error::other(format!(
2890                "DM inbox spawn failed: {e}"
2891            )))
2892        })?;
2893        let mut guard = self.dm_inbox_service.lock().await;
2894        if let Some(prev) = guard.take() {
2895            prev.abort();
2896        }
2897        *guard = Some(service);
2898
2899        // Upgrade our advertised capabilities so peers stop falling back
2900        // to the raw-QUIC path. The capability advert service watches
2901        // this channel and republishes immediately on change.
2902        let upgraded =
2903            dm::DmCapabilities::pending().with_kem_public_key(kem_keypair.public_bytes.clone());
2904        if self.dm_capabilities_tx.send(upgraded).is_err() {
2905            tracing::debug!("dm_capabilities watch has no receivers; skipping upgrade broadcast");
2906        }
2907        tracing::info!("DM inbox service started");
2908        Ok(())
2909    }
2910
2911    /// Stop the DM inbox service, if running. Idempotent.
2912    pub async fn stop_dm_inbox(&self) {
2913        let mut guard = self.dm_inbox_service.lock().await;
2914        if let Some(service) = guard.take() {
2915            service.abort();
2916        }
2917    }
2918
2919    /// Connect to cached peers in parallel, returning (succeeded, failed) peer lists.
2920    async fn connect_cached_peers_parallel_tracked(
2921        &self,
2922        network: &std::sync::Arc<network::NetworkNode>,
2923        peers: &[ant_quic::CachedPeer],
2924    ) -> (Vec<std::net::SocketAddr>, Vec<ant_quic::PeerId>) {
2925        use tokio::time::{timeout, Duration};
2926        const CONNECT_TIMEOUT: Duration = Duration::from_secs(15);
2927
2928        let handles: Vec<_> = peers
2929            .iter()
2930            .map(|peer| {
2931                let net = network.clone();
2932                let peer_id = peer.peer_id;
2933                tokio::spawn(async move {
2934                    tracing::debug!("Connecting to cached peer: {:?}", peer_id);
2935                    match timeout(CONNECT_TIMEOUT, net.connect_cached_peer(peer_id)).await {
2936                        Ok(Ok(addr)) => {
2937                            tracing::info!("Connected to cached peer {:?} at {}", peer_id, addr);
2938                            Ok(addr)
2939                        }
2940                        Ok(Err(e)) => {
2941                            tracing::warn!("Failed to connect to cached peer {:?}: {}", peer_id, e);
2942                            Err(peer_id)
2943                        }
2944                        Err(_) => {
2945                            tracing::warn!(
2946                                "Connection to cached peer {:?} timed out after {}s",
2947                                peer_id,
2948                                CONNECT_TIMEOUT.as_secs()
2949                            );
2950                            Err(peer_id)
2951                        }
2952                    }
2953                })
2954            })
2955            .collect();
2956
2957        let mut succeeded = Vec::new();
2958        let mut failed = Vec::new();
2959        for handle in handles {
2960            match handle.await {
2961                Ok(Ok(addr)) => succeeded.push(addr),
2962                Ok(Err(peer_id)) => failed.push(peer_id),
2963                Err(e) => tracing::error!("Connection task panicked: {}", e),
2964            }
2965        }
2966        (succeeded, failed)
2967    }
2968
2969    /// Connect to multiple peers in parallel, returning (succeeded, failed) address lists.
2970    async fn connect_peers_parallel_tracked(
2971        &self,
2972        network: &std::sync::Arc<network::NetworkNode>,
2973        addrs: &[std::net::SocketAddr],
2974    ) -> (Vec<std::net::SocketAddr>, Vec<std::net::SocketAddr>) {
2975        use tokio::time::{timeout, Duration};
2976
2977        // Per-connection timeout prevents hanging when connecting to
2978        // ourselves or to unreachable addresses.
2979        const CONNECT_TIMEOUT: Duration = Duration::from_secs(15);
2980
2981        let handles: Vec<_> = addrs
2982            .iter()
2983            .map(|addr| {
2984                let net = network.clone();
2985                let addr = *addr;
2986                tokio::spawn(async move {
2987                    tracing::debug!("Connecting to peer: {}", addr);
2988                    match timeout(CONNECT_TIMEOUT, net.connect_addr(addr)).await {
2989                        Ok(Ok(_)) => {
2990                            tracing::info!("Connected to peer: {}", addr);
2991                            Ok(addr)
2992                        }
2993                        Ok(Err(e)) => {
2994                            tracing::warn!("Failed to connect to {}: {}", addr, e);
2995                            Err(addr)
2996                        }
2997                        Err(_) => {
2998                            tracing::warn!(
2999                                "Connection to {} timed out after {}s",
3000                                addr,
3001                                CONNECT_TIMEOUT.as_secs()
3002                            );
3003                            Err(addr)
3004                        }
3005                    }
3006                })
3007            })
3008            .collect();
3009
3010        let mut succeeded = Vec::new();
3011        let mut failed = Vec::new();
3012        for handle in handles {
3013            match handle.await {
3014                Ok(Ok(addr)) => succeeded.push(addr),
3015                Ok(Err(addr)) => failed.push(addr),
3016                Err(e) => tracing::error!("Connection task panicked: {}", e),
3017            }
3018        }
3019        (succeeded, failed)
3020    }
3021
3022    /// Subscribe to messages on a given topic.
3023    ///
3024    /// Returns a [`gossip::Subscription`] that yields messages as they arrive
3025    /// through the gossip network.
3026    ///
3027    /// # Errors
3028    ///
3029    /// Returns an error if:
3030    /// - Gossip runtime is not initialized (configure agent with network first)
3031    pub async fn subscribe(&self, topic: &str) -> error::Result<Subscription> {
3032        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3033            error::IdentityError::Storage(std::io::Error::other(
3034                "gossip runtime not initialized - configure agent with network first",
3035            ))
3036        })?;
3037        Ok(runtime.pubsub().subscribe(topic.to_string()).await)
3038    }
3039
3040    /// Publish a message to a topic.
3041    ///
3042    /// The message will propagate through the gossip network via
3043    /// epidemic broadcast — every agent that receives it will
3044    /// relay it to its neighbours.
3045    ///
3046    /// # Errors
3047    ///
3048    /// Returns an error if:
3049    /// - Gossip runtime is not initialized (configure agent with network first)
3050    /// - Message encoding or broadcast fails
3051    pub async fn publish(&self, topic: &str, payload: Vec<u8>) -> error::Result<()> {
3052        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3053            error::IdentityError::Storage(std::io::Error::other(
3054                "gossip runtime not initialized - configure agent with network first",
3055            ))
3056        })?;
3057        runtime
3058            .pubsub()
3059            .publish(topic.to_string(), bytes::Bytes::from(payload))
3060            .await
3061            .map_err(|e| {
3062                error::IdentityError::Storage(std::io::Error::other(format!(
3063                    "publish failed: {}",
3064                    e
3065                )))
3066            })
3067    }
3068
3069    /// Get connected peer IDs.
3070    ///
3071    /// Returns the list of peers currently connected via the gossip network.
3072    ///
3073    /// # Errors
3074    ///
3075    /// Returns an error if the network is not initialized.
3076    pub async fn peers(&self) -> error::Result<Vec<saorsa_gossip_types::PeerId>> {
3077        let network = self.network.as_ref().ok_or_else(|| {
3078            error::IdentityError::Storage(std::io::Error::other(
3079                "network not initialized - configure agent with network first",
3080            ))
3081        })?;
3082        let ant_peers = network.connected_peers().await;
3083        Ok(ant_peers
3084            .into_iter()
3085            .map(|p| saorsa_gossip_types::PeerId::new(p.0))
3086            .collect())
3087    }
3088
3089    /// Get online agents.
3090    ///
3091    /// Returns agent IDs discovered from signed identity announcements.
3092    ///
3093    /// # Errors
3094    ///
3095    /// Returns an error if the gossip runtime is not initialized.
3096    pub async fn presence(&self) -> error::Result<Vec<identity::AgentId>> {
3097        self.start_identity_listener().await?;
3098        let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
3099        let mut agents: Vec<_> = self
3100            .identity_discovery_cache
3101            .read()
3102            .await
3103            .values()
3104            .filter(|a| a.announced_at >= cutoff)
3105            .map(|a| a.agent_id)
3106            .collect();
3107        agents.sort_by_key(|a| a.0);
3108        Ok(agents)
3109    }
3110
3111    /// Subscribe to presence events (agent online/offline notifications).
3112    ///
3113    /// Returns a [`tokio::sync::broadcast::Receiver<PresenceEvent>`] that yields
3114    /// [`presence::PresenceEvent`] values as agents come online or go offline.
3115    ///
3116    /// The diff-based event emission loop is started lazily on the first call to this
3117    /// method (or when [`join_network`](Agent::join_network) is called). Subsequent
3118    /// calls return independent receivers on the same broadcast channel.
3119    ///
3120    /// # Errors
3121    ///
3122    /// Returns [`error::NetworkError::NodeError`] if this agent was built
3123    /// without a network configuration (i.e. no `with_network_config` on the builder).
3124    pub async fn subscribe_presence(
3125        &self,
3126    ) -> error::NetworkResult<tokio::sync::broadcast::Receiver<presence::PresenceEvent>> {
3127        let pw = self.presence.as_ref().ok_or_else(|| {
3128            error::NetworkError::NodeError("presence system not initialized".to_string())
3129        })?;
3130        // Ensure the event loop is running.
3131        pw.start_event_loop(std::sync::Arc::clone(&self.identity_discovery_cache))
3132            .await;
3133        Ok(pw.subscribe_events())
3134    }
3135
3136    /// Look up a single agent in the local discovery cache.
3137    ///
3138    /// Returns `None` if the agent is not currently cached.  No network I/O is
3139    /// performed — use [`discover_agent_by_id`](Agent::discover_agent_by_id) for
3140    /// an active lookup that queries the network.
3141    pub async fn cached_agent(&self, id: &identity::AgentId) -> Option<DiscoveredAgent> {
3142        self.identity_discovery_cache.read().await.get(id).cloned()
3143    }
3144
3145    /// Check whether a claimed `AgentId` is verified as belonging to the
3146    /// given `MachineId` in the identity discovery cache.
3147    ///
3148    /// Returns `true` if the cache contains a signed identity announcement
3149    /// binding this agent to this machine. Returns `false` if the agent is
3150    /// unknown or bound to a different machine.
3151    pub async fn is_agent_machine_verified(
3152        &self,
3153        agent_id: &identity::AgentId,
3154        machine_id: &identity::MachineId,
3155    ) -> bool {
3156        let cache = self.identity_discovery_cache.read().await;
3157        cache
3158            .get(agent_id)
3159            .map(|entry| entry.machine_id == *machine_id)
3160            .unwrap_or(false)
3161    }
3162
3163    /// Discover agents via Friend-of-a-Friend (FOAF) random walk.
3164    ///
3165    /// Initiates a FOAF query on the global presence topic with the given `ttl`
3166    /// (maximum hop count) and `timeout_ms` (response collection window).
3167    ///
3168    /// Returned entries are resolved against the local identity discovery cache
3169    /// so that known agents are returned with full identity data.  Unknown peers
3170    /// are included with a minimal entry (addresses only) that will be enriched
3171    /// once their identity heartbeat arrives.
3172    ///
3173    /// # Arguments
3174    ///
3175    /// * `ttl` — Maximum hop count for the random walk (`1`–`5`). Typical: `2`.
3176    /// * `timeout_ms` — Query timeout in milliseconds. Typical: `5000`.
3177    ///
3178    /// # Errors
3179    ///
3180    /// Returns [`error::NetworkError::NodeError`] if no network config was provided.
3181    pub async fn discover_agents_foaf(
3182        &self,
3183        ttl: u8,
3184        timeout_ms: u64,
3185    ) -> error::NetworkResult<Vec<DiscoveredAgent>> {
3186        let pw = self.presence.as_ref().ok_or_else(|| {
3187            error::NetworkError::NodeError("presence system not initialized".to_string())
3188        })?;
3189
3190        let topic = presence::global_presence_topic();
3191        let raw_results: Vec<(
3192            saorsa_gossip_types::PeerId,
3193            saorsa_gossip_types::PresenceRecord,
3194        )> = pw
3195            .manager()
3196            .initiate_foaf_query(topic, ttl, timeout_ms)
3197            .await
3198            .map_err(|e| error::NetworkError::NodeError(e.to_string()))?;
3199
3200        let cache = self.identity_discovery_cache.read().await;
3201
3202        // Convert and deduplicate by agent_id.
3203        let mut seen: std::collections::HashSet<identity::AgentId> =
3204            std::collections::HashSet::new();
3205        let mut agents: Vec<DiscoveredAgent> = Vec::with_capacity(raw_results.len());
3206
3207        for (peer_id, record) in &raw_results {
3208            if let Some(agent) =
3209                presence::presence_record_to_discovered_agent(*peer_id, record, &cache)
3210            {
3211                if seen.insert(agent.agent_id) {
3212                    agents.push(agent);
3213                }
3214            }
3215        }
3216
3217        Ok(agents)
3218    }
3219
3220    /// Discover a specific agent by their [`identity::AgentId`] via FOAF random walk.
3221    ///
3222    /// Fast-path: checks the local identity discovery cache first and returns
3223    /// immediately if the agent is already known.
3224    ///
3225    /// Slow-path: performs a FOAF random walk (see [`discover_agents_foaf`](Agent::discover_agents_foaf))
3226    /// and searches the results for a matching `AgentId`.
3227    ///
3228    /// Returns `None` if the agent is not found within the given `ttl` and `timeout_ms`.
3229    ///
3230    /// # Errors
3231    ///
3232    /// Returns [`error::NetworkError::NodeCreation`] if no network config was provided.
3233    pub async fn discover_agent_by_id(
3234        &self,
3235        target_id: identity::AgentId,
3236        ttl: u8,
3237        timeout_ms: u64,
3238    ) -> error::NetworkResult<Option<DiscoveredAgent>> {
3239        // Fast path: already in local cache.
3240        {
3241            let cache = self.identity_discovery_cache.read().await;
3242            if let Some(agent) = cache.get(&target_id) {
3243                return Ok(Some(agent.clone()));
3244            }
3245        }
3246
3247        // Slow path: FOAF random walk.
3248        let agents = self.discover_agents_foaf(ttl, timeout_ms).await?;
3249        Ok(agents.into_iter().find(|a| a.agent_id == target_id))
3250    }
3251
3252    /// Find an agent by ID, returning its known addresses.
3253    ///
3254    /// Performs a three-stage lookup:
3255    /// 1. **Cache hit** — return addresses immediately if the agent has already
3256    ///    been discovered.
3257    /// 2. **Shard subscription** — subscribe to the agent's identity shard topic
3258    ///    and wait up to 5 seconds for a heartbeat announcement.
3259    /// 3. **Rendezvous** — subscribe to the agent's rendezvous shard topic and
3260    ///    wait up to 5 seconds for a `ProviderSummary` advertisement.  This
3261    ///    works even when the two agents are on different gossip overlay clusters.
3262    ///
3263    /// Returns `None` if the agent is not found within the combined deadline.
3264    ///
3265    /// # Errors
3266    ///
3267    /// Returns an error if the gossip runtime is not initialized.
3268    pub async fn find_agent(
3269        &self,
3270        agent_id: identity::AgentId,
3271    ) -> error::Result<Option<Vec<std::net::SocketAddr>>> {
3272        self.start_identity_listener().await?;
3273
3274        // Stage 1: cache hit.
3275        if let Some(addrs) = self
3276            .identity_discovery_cache
3277            .read()
3278            .await
3279            .get(&agent_id)
3280            .map(|e| e.addresses.clone())
3281        {
3282            return Ok(Some(addrs));
3283        }
3284
3285        // Stage 2: subscribe to the agent's identity shard topic and wait up to 5 s.
3286        let runtime = match self.gossip_runtime.as_ref() {
3287            Some(r) => r,
3288            None => return Ok(None),
3289        };
3290        let shard_topic = shard_topic_for_agent(&agent_id);
3291        let mut sub = runtime.pubsub().subscribe(shard_topic).await;
3292        let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
3293        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
3294
3295        loop {
3296            if tokio::time::Instant::now() >= deadline {
3297                break;
3298            }
3299            let timeout = tokio::time::sleep_until(deadline);
3300            tokio::select! {
3301                Some(msg) = sub.recv() => {
3302                    if let Ok(ann) = {
3303                        use bincode::Options;
3304                        bincode::DefaultOptions::new()
3305                            .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
3306                            .deserialize::<IdentityAnnouncement>(&msg.payload)
3307                    } {
3308                        if ann.verify().is_ok() && ann.agent_id == agent_id {
3309                            let now = std::time::SystemTime::now()
3310                                .duration_since(std::time::UNIX_EPOCH)
3311                                .map_or(0, |d| d.as_secs());
3312                            let filtered: Vec<std::net::SocketAddr> = ann
3313                                .addresses
3314                                .iter()
3315                                .copied()
3316                                .filter(|a| is_publicly_advertisable(*a))
3317                                .collect();
3318                            let addrs = filtered.clone();
3319                            cache.write().await.insert(
3320                                ann.agent_id,
3321                                DiscoveredAgent {
3322                                    agent_id: ann.agent_id,
3323                                    machine_id: ann.machine_id,
3324                                    user_id: ann.user_id,
3325                                    addresses: filtered,
3326                                    announced_at: ann.announced_at,
3327                                    last_seen: now,
3328                                    machine_public_key: ann.machine_public_key.clone(),
3329                                    nat_type: ann.nat_type.clone(),
3330                                    can_receive_direct: ann.can_receive_direct,
3331                                    is_relay: ann.is_relay,
3332                                    is_coordinator: ann.is_coordinator,
3333                                },
3334                            );
3335                            return Ok(Some(addrs));
3336                        }
3337                    }
3338                }
3339                _ = timeout => break,
3340            }
3341        }
3342
3343        // Stage 3: rendezvous shard subscription — wait up to 5 s.
3344        // Cache the result so subsequent connect_to_agent / send_direct can find it.
3345        if let Some(addrs) = self.find_agent_rendezvous(agent_id, 5).await? {
3346            let now = std::time::SystemTime::now()
3347                .duration_since(std::time::UNIX_EPOCH)
3348                .map_or(0, |d| d.as_secs());
3349            cache
3350                .write()
3351                .await
3352                .entry(agent_id)
3353                .or_insert_with(|| DiscoveredAgent {
3354                    agent_id,
3355                    machine_id: identity::MachineId([0u8; 32]),
3356                    user_id: None,
3357                    addresses: addrs.clone(),
3358                    announced_at: now,
3359                    last_seen: now,
3360                    machine_public_key: Vec::new(),
3361                    nat_type: None,
3362                    can_receive_direct: None,
3363                    is_relay: None,
3364                    is_coordinator: None,
3365                });
3366            return Ok(Some(addrs));
3367        }
3368
3369        Ok(None)
3370    }
3371
3372    /// Find all discovered agents claiming ownership by the given [`identity::UserId`].
3373    ///
3374    /// Only returns agents that announced with `include_user_identity: true`
3375    /// (i.e., agents whose [`DiscoveredAgent::user_id`] is `Some`).
3376    ///
3377    /// # Arguments
3378    ///
3379    /// * `user_id` - The user identity to search for
3380    ///
3381    /// # Errors
3382    ///
3383    /// Returns an error if the gossip runtime is not initialized.
3384    pub async fn find_agents_by_user(
3385        &self,
3386        user_id: identity::UserId,
3387    ) -> error::Result<Vec<DiscoveredAgent>> {
3388        self.start_identity_listener().await?;
3389        let cutoff = Self::unix_timestamp_secs().saturating_sub(self.identity_ttl_secs);
3390        Ok(self
3391            .identity_discovery_cache
3392            .read()
3393            .await
3394            .values()
3395            .filter(|a| a.announced_at >= cutoff && a.user_id == Some(user_id))
3396            .cloned()
3397            .collect())
3398    }
3399
3400    /// Return the local socket address this agent's network node is bound to, if any.
3401    ///
3402    /// Returns `None` if no network has been configured or if the bind address is
3403    /// not yet known.
3404    ///
3405    /// **Note:** If the node was configured with port 0, this returns port 0.
3406    /// Use [`bound_addr()`](Self::bound_addr) to get the OS-assigned port.
3407    #[must_use]
3408    pub fn local_addr(&self) -> Option<std::net::SocketAddr> {
3409        self.network.as_ref().and_then(|n| n.local_addr())
3410    }
3411
3412    /// Return the actual bound address from the QUIC endpoint.
3413    ///
3414    /// Unlike [`local_addr()`](Self::local_addr) which returns the configured value
3415    /// (possibly port 0), this queries the running endpoint for the real OS-assigned
3416    /// address. Returns `None` if no network has been configured.
3417    pub async fn bound_addr(&self) -> Option<std::net::SocketAddr> {
3418        if let Some(ref network) = self.network {
3419            let addr = network.bound_addr().await;
3420            // On dual-stack systems, bound_addr may return [::]:port even when
3421            // we bound to 127.0.0.1. Normalize to IPv4 if the original config
3422            // was IPv4.
3423            match (addr, self.local_addr()) {
3424                (Some(bound), Some(config)) if config.is_ipv4() && bound.is_ipv6() => {
3425                    Some(std::net::SocketAddr::new(config.ip(), bound.port()))
3426                }
3427                (Some(bound), _) => Some(bound),
3428                _ => None,
3429            }
3430        } else {
3431            None
3432        }
3433    }
3434
3435    /// Build a signed [`IdentityAnnouncement`] for this agent.
3436    ///
3437    /// Delegates to the internal `build_identity_announcement` method.
3438    ///
3439    /// # Errors
3440    ///
3441    /// Returns an error if key signing fails or human consent is required but not given.
3442    pub fn build_announcement(
3443        &self,
3444        include_user: bool,
3445        consent: bool,
3446    ) -> error::Result<IdentityAnnouncement> {
3447        self.build_identity_announcement(include_user, consent)
3448    }
3449
3450    /// Start the background identity heartbeat task.
3451    ///
3452    /// Idempotent — if the heartbeat is already running, returns `Ok(())` immediately.
3453    /// The heartbeat re-announces this agent's identity at `heartbeat_interval_secs`
3454    /// intervals so that late-joining peers can discover it without waiting for a
3455    /// Start the network event reconciliation listener.
3456    ///
3457    /// This bridges transport-level peer connect/disconnect events into the
3458    /// agent-level direct messaging registry so inbound accepted connections are
3459    /// usable for reverse direct sends before the first inbound direct payload.
3460    fn start_network_event_listener(&self) {
3461        if self
3462            .network_event_listener_started
3463            .swap(true, std::sync::atomic::Ordering::AcqRel)
3464        {
3465            return;
3466        }
3467
3468        let Some(network) = self.network.as_ref().map(std::sync::Arc::clone) else {
3469            return;
3470        };
3471        let cache = std::sync::Arc::clone(&self.identity_discovery_cache);
3472        let dm = std::sync::Arc::clone(&self.direct_messaging);
3473
3474        tokio::spawn(async move {
3475            let mut rx = network.subscribe();
3476            tracing::info!("Network event reconciliation listener started");
3477
3478            loop {
3479                let event = match rx.recv().await {
3480                    Ok(event) => event,
3481                    Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
3482                        tracing::warn!("Network event listener lagged by {skipped} events");
3483                        continue;
3484                    }
3485                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
3486                };
3487
3488                match event {
3489                    network::NetworkEvent::PeerConnected { peer_id, .. } => {
3490                        let machine_id = identity::MachineId(peer_id);
3491                        let cached_agent_id = {
3492                            let cache = cache.read().await;
3493                            cache
3494                                .values()
3495                                .find(|entry| entry.machine_id == machine_id)
3496                                .map(|entry| entry.agent_id)
3497                        };
3498                        let agent_id = match cached_agent_id {
3499                            Some(agent_id) => Some(agent_id),
3500                            None => dm.lookup_agent(&machine_id).await,
3501                        };
3502                        if let Some(agent_id) = agent_id {
3503                            dm.mark_connected(agent_id, machine_id).await;
3504                        }
3505                    }
3506                    network::NetworkEvent::PeerDisconnected { peer_id } => {
3507                        let machine_id = identity::MachineId(peer_id);
3508                        let cached_agent_id = {
3509                            let cache = cache.read().await;
3510                            cache
3511                                .values()
3512                                .find(|entry| entry.machine_id == machine_id)
3513                                .map(|entry| entry.agent_id)
3514                        };
3515                        let agent_id = match cached_agent_id {
3516                            Some(agent_id) => Some(agent_id),
3517                            None => dm.lookup_agent(&machine_id).await,
3518                        };
3519                        if let Some(agent_id) = agent_id {
3520                            dm.mark_disconnected(&agent_id).await;
3521                        }
3522                    }
3523                    _ => {}
3524                }
3525            }
3526        });
3527    }
3528
3529    /// Start the direct message listener background task.
3530    ///
3531    /// This task reads raw direct messages from the network layer and
3532    /// dispatches them to `DirectMessaging::handle_incoming()`, which
3533    /// broadcasts to all `subscribe_direct()` receivers.
3534    ///
3535    /// Called automatically by [`Agent::join_network`].
3536    fn start_direct_listener(&self) {
3537        if self
3538            .direct_listener_started
3539            .swap(true, std::sync::atomic::Ordering::AcqRel)
3540        {
3541            return;
3542        }
3543
3544        let Some(network) = self.network.as_ref().map(std::sync::Arc::clone) else {
3545            return;
3546        };
3547        let dm = std::sync::Arc::clone(&self.direct_messaging);
3548        let discovery_cache = std::sync::Arc::clone(&self.identity_discovery_cache);
3549        let contact_store = std::sync::Arc::clone(&self.contact_store);
3550
3551        tokio::spawn(async move {
3552            tracing::info!(target: "x0x::direct", stage = "listener", "direct message listener started");
3553            loop {
3554                let Some((ant_peer_id, payload)) = network.recv_direct().await else {
3555                    tracing::warn!(
3556                        target: "x0x::direct",
3557                        stage = "listener",
3558                        "network.recv_direct channel closed — listener exiting"
3559                    );
3560                    break;
3561                };
3562
3563                let raw_bytes = payload.len();
3564
3565                // Parse: first 32 bytes = sender AgentId, rest = payload
3566                if payload.len() < 32 {
3567                    tracing::warn!(
3568                        target: "x0x::direct",
3569                        stage = "listener",
3570                        machine_prefix = %network::hex_prefix(&ant_peer_id.0, 4),
3571                        raw_bytes,
3572                        outcome = "drop_too_short",
3573                        "direct message too short to contain sender id"
3574                    );
3575                    continue;
3576                }
3577
3578                let mut sender_bytes = [0u8; 32];
3579                sender_bytes.copy_from_slice(&payload[..32]);
3580                let sender = identity::AgentId(sender_bytes);
3581                let machine_id = identity::MachineId(ant_peer_id.0);
3582                let data = payload[32..].to_vec();
3583                let payload_bytes = data.len();
3584
3585                // Verify AgentId→MachineId binding against identity discovery cache.
3586                let verified = {
3587                    let cache = discovery_cache.read().await;
3588                    cache
3589                        .get(&sender)
3590                        .map(|entry| entry.machine_id == machine_id)
3591                        .unwrap_or(false)
3592                };
3593
3594                // Evaluate trust for the (AgentId, MachineId) pair.
3595                let trust_decision = {
3596                    let contacts = contact_store.read().await;
3597                    let evaluator = trust::TrustEvaluator::new(&contacts);
3598                    let ctx = trust::TrustContext {
3599                        agent_id: &sender,
3600                        machine_id: &machine_id,
3601                    };
3602                    Some(evaluator.evaluate(&ctx))
3603                };
3604
3605                tracing::info!(
3606                    target: "x0x::direct",
3607                    stage = "recv",
3608                    sender_prefix = %network::hex_prefix(&sender.0, 4),
3609                    machine_prefix = %network::hex_prefix(&machine_id.0, 4),
3610                    raw_bytes,
3611                    payload_bytes,
3612                    verified,
3613                    trust_decision = ?trust_decision,
3614                    "direct message received; dispatching to subscribers"
3615                );
3616
3617                // Register and mark the sender as connected for future reverse direct sends.
3618                dm.mark_connected(sender, machine_id).await;
3619
3620                // Broadcast to all subscribe_direct() receivers with verification info.
3621                dm.handle_incoming(machine_id, sender, data, verified, trust_decision)
3622                    .await;
3623
3624                tracing::debug!(
3625                    target: "x0x::direct",
3626                    stage = "recv",
3627                    sender_prefix = %network::hex_prefix(&sender.0, 4),
3628                    payload_bytes,
3629                    subscriber_count = dm.subscriber_count(),
3630                    "direct message dispatched"
3631                );
3632            }
3633        });
3634    }
3635
3636    /// new announcement.
3637    ///
3638    /// Called automatically by [`Agent::join_network`].
3639    ///
3640    /// # Errors
3641    ///
3642    /// Returns an error if a required network or gossip component is missing.
3643    pub async fn start_identity_heartbeat(&self) -> error::Result<()> {
3644        let mut handle_guard = self.heartbeat_handle.lock().await;
3645        if handle_guard.is_some() {
3646            return Ok(());
3647        }
3648        let Some(runtime) = self.gossip_runtime.as_ref().map(std::sync::Arc::clone) else {
3649            return Err(error::IdentityError::Storage(std::io::Error::other(
3650                "gossip runtime not initialized — cannot start heartbeat",
3651            )));
3652        };
3653        let Some(network) = self.network.as_ref().map(std::sync::Arc::clone) else {
3654            return Err(error::IdentityError::Storage(std::io::Error::other(
3655                "network not initialized — cannot start heartbeat",
3656            )));
3657        };
3658        let ctx = HeartbeatContext {
3659            identity: std::sync::Arc::clone(&self.identity),
3660            runtime,
3661            network,
3662            interval_secs: self.heartbeat_interval_secs,
3663            cache: std::sync::Arc::clone(&self.identity_discovery_cache),
3664            user_identity_consented: std::sync::Arc::clone(&self.user_identity_consented),
3665        };
3666        let handle = tokio::task::spawn(async move {
3667            let mut ticker =
3668                tokio::time::interval(std::time::Duration::from_secs(ctx.interval_secs));
3669            ticker.tick().await; // skip first immediate tick
3670            loop {
3671                ticker.tick().await;
3672                if let Err(e) = ctx.announce().await {
3673                    tracing::warn!("identity heartbeat announce failed: {e}");
3674                }
3675            }
3676        });
3677        *handle_guard = Some(handle);
3678        Ok(())
3679    }
3680
3681    /// Publish a rendezvous `ProviderSummary` for this agent.
3682    ///
3683    /// Enables global findability across gossip overlay partitions.  Seekers
3684    /// that have never been on the same partition as this agent can still
3685    /// discover it by subscribing to the rendezvous shard topic and waiting
3686    /// for the next heartbeat advertisement.
3687    ///
3688    /// The summary is signed with this agent's machine key and contains the
3689    /// agent's reachability addresses in the `extensions` field (bincode-encoded
3690    /// `Vec<SocketAddr>`).
3691    ///
3692    /// # Re-advertisement contract
3693    ///
3694    /// Rendezvous summaries expire after `validity_ms` milliseconds.  **Callers
3695    /// are responsible for calling `advertise_identity` again before expiry** so
3696    /// that seekers can always find a fresh record.  A common strategy is to
3697    /// re-advertise every `validity_ms / 2`.  The `x0xd` daemon does this
3698    /// automatically via its background re-advertisement task.
3699    ///
3700    /// # Arguments
3701    ///
3702    /// * `validity_ms` — How long (milliseconds) before the summary expires.
3703    ///   After this time, seekers will no longer discover this agent via rendezvous
3704    ///   unless a fresh `advertise_identity` call is made.
3705    ///
3706    /// # Errors
3707    ///
3708    /// Returns an error if the gossip runtime is not initialized, serialization
3709    /// fails, or signing fails.
3710    pub async fn advertise_identity(&self, validity_ms: u64) -> error::Result<()> {
3711        use saorsa_gossip_rendezvous::{Capability, ProviderSummary};
3712
3713        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3714            error::IdentityError::Storage(std::io::Error::other(
3715                "gossip runtime not initialized — cannot advertise identity",
3716            ))
3717        })?;
3718
3719        let peer_id = runtime.peer_id();
3720        let addresses = self.announcement_addresses();
3721        let addr_bytes = bincode::serialize(&addresses).map_err(|e| {
3722            error::IdentityError::Serialization(format!(
3723                "failed to serialize addresses for rendezvous: {e}"
3724            ))
3725        })?;
3726
3727        let mut summary = ProviderSummary::new(
3728            self.agent_id().0,
3729            peer_id,
3730            vec![Capability::Identity],
3731            validity_ms,
3732        )
3733        .with_extensions(addr_bytes);
3734
3735        summary
3736            .sign_raw(self.identity.machine_keypair().secret_key().as_bytes())
3737            .map_err(|e| {
3738                error::IdentityError::Storage(std::io::Error::other(format!(
3739                    "failed to sign rendezvous summary: {e}"
3740                )))
3741            })?;
3742
3743        let cbor_bytes = summary.to_cbor().map_err(|e| {
3744            error::IdentityError::Serialization(format!(
3745                "failed to CBOR-encode rendezvous summary: {e}"
3746            ))
3747        })?;
3748
3749        let topic = rendezvous_shard_topic_for_agent(&self.agent_id());
3750        runtime
3751            .pubsub()
3752            .publish(topic, bytes::Bytes::from(cbor_bytes))
3753            .await
3754            .map_err(|e| {
3755                error::IdentityError::Storage(std::io::Error::other(format!(
3756                    "failed to publish rendezvous summary: {e}"
3757                )))
3758            })?;
3759
3760        self.rendezvous_advertised
3761            .store(true, std::sync::atomic::Ordering::Relaxed);
3762        Ok(())
3763    }
3764
3765    /// Search for an agent via rendezvous shard subscription.
3766    ///
3767    /// Subscribes to the rendezvous shard topic for `agent_id` and waits up to
3768    /// `timeout_secs` for a matching [`saorsa_gossip_rendezvous::ProviderSummary`].
3769    /// On success the addresses encoded in the summary `extensions` field are
3770    /// returned.
3771    ///
3772    /// This is Stage 3 of [`Agent::find_agent`]'s lookup cascade.
3773    ///
3774    /// # Errors
3775    ///
3776    /// Returns an error if the gossip runtime is not initialized.
3777    pub async fn find_agent_rendezvous(
3778        &self,
3779        agent_id: identity::AgentId,
3780        timeout_secs: u64,
3781    ) -> error::Result<Option<Vec<std::net::SocketAddr>>> {
3782        use saorsa_gossip_rendezvous::ProviderSummary;
3783
3784        let runtime = match self.gossip_runtime.as_ref() {
3785            Some(r) => r,
3786            None => return Ok(None),
3787        };
3788
3789        let topic = rendezvous_shard_topic_for_agent(&agent_id);
3790        let mut sub = runtime.pubsub().subscribe(topic).await;
3791        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
3792
3793        loop {
3794            if tokio::time::Instant::now() >= deadline {
3795                break;
3796            }
3797            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
3798            tokio::select! {
3799                Some(msg) = sub.recv() => {
3800                    let summary = match ProviderSummary::from_cbor(&msg.payload) {
3801                        Ok(s) => s,
3802                        Err(_) => continue,
3803                    };
3804                    if summary.target != agent_id.0 {
3805                        continue;
3806                    }
3807                    // Verify the summary signature when the advertiser's machine
3808                    // public key is cached from a prior identity announcement.
3809                    // Without a cached key we still accept the addresses — they
3810                    // are connection hints only; the subsequent QUIC handshake will
3811                    // fail cryptographically if the endpoint is not the genuine agent.
3812                    let cached_pub = self
3813                        .identity_discovery_cache
3814                        .read()
3815                        .await
3816                        .get(&agent_id)
3817                        .map(|e| e.machine_public_key.clone());
3818                    if let Some(pub_bytes) = cached_pub {
3819                        if !pub_bytes.is_empty()
3820                            && !summary.verify_raw(&pub_bytes).unwrap_or(false)
3821                        {
3822                            tracing::warn!(
3823                                "Rendezvous summary signature verification failed for agent {:?}; discarding",
3824                                agent_id
3825                            );
3826                            continue;
3827                        }
3828                    }
3829                    // Decode addresses from the extensions field.
3830                    let addrs: Vec<std::net::SocketAddr> = summary
3831                        .extensions
3832                        .as_deref()
3833                        .and_then(|b| {
3834                            use bincode::Options;
3835                            bincode::DefaultOptions::new()
3836                                .with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
3837                                .deserialize(b)
3838                                .ok()
3839                        })
3840                        .unwrap_or_default();
3841                    if !addrs.is_empty() {
3842                        return Ok(Some(addrs));
3843                    }
3844                }
3845                _ = tokio::time::sleep(remaining) => break,
3846            }
3847        }
3848
3849        Ok(None)
3850    }
3851
3852    /// Insert a discovered agent into the cache (for testing only).
3853    ///
3854    /// # Arguments
3855    ///
3856    /// * `agent` - The agent entry to insert.
3857    #[doc(hidden)]
3858    pub async fn insert_discovered_agent_for_testing(&self, agent: DiscoveredAgent) {
3859        let agent_id = agent.agent_id;
3860        let machine_id = agent.machine_id;
3861        self.identity_discovery_cache
3862            .write()
3863            .await
3864            .insert(agent_id, agent);
3865
3866        if machine_id.0 != [0u8; 32] {
3867            self.direct_messaging
3868                .register_agent(agent_id, machine_id)
3869                .await;
3870            if let Some(ref network) = self.network {
3871                let ant_peer_id = ant_quic::PeerId(machine_id.0);
3872                if network.is_connected(&ant_peer_id).await {
3873                    self.direct_messaging
3874                        .mark_connected(agent_id, machine_id)
3875                        .await;
3876                }
3877            }
3878        }
3879    }
3880
3881    /// Create a new collaborative task list bound to a topic.
3882    ///
3883    /// Creates a new `TaskList` and binds it to the specified gossip topic
3884    /// for automatic synchronization with other agents on the same topic.
3885    ///
3886    /// # Arguments
3887    ///
3888    /// * `name` - Human-readable name for the task list
3889    /// * `topic` - Gossip topic for synchronization
3890    ///
3891    /// # Returns
3892    ///
3893    /// A `TaskListHandle` for interacting with the task list.
3894    ///
3895    /// # Errors
3896    ///
3897    /// Returns an error if the gossip runtime is not initialized.
3898    ///
3899    /// # Example
3900    ///
3901    /// ```ignore
3902    /// let list = agent.create_task_list("Sprint Planning", "team-sprint").await?;
3903    /// ```
3904    pub async fn create_task_list(&self, name: &str, topic: &str) -> error::Result<TaskListHandle> {
3905        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3906            error::IdentityError::Storage(std::io::Error::other(
3907                "gossip runtime not initialized - configure agent with network first",
3908            ))
3909        })?;
3910
3911        let peer_id = runtime.peer_id();
3912        let list_id = crdt::TaskListId::from_content(name, &self.agent_id(), 0);
3913        let task_list = crdt::TaskList::new(list_id, name.to_string(), peer_id);
3914
3915        let sync = crdt::TaskListSync::new(
3916            task_list,
3917            std::sync::Arc::clone(runtime.pubsub()),
3918            topic.to_string(),
3919            30,
3920        )
3921        .map_err(|e| {
3922            error::IdentityError::Storage(std::io::Error::other(format!(
3923                "task list sync creation failed: {}",
3924                e
3925            )))
3926        })?;
3927
3928        let sync = std::sync::Arc::new(sync);
3929        sync.start().await.map_err(|e| {
3930            error::IdentityError::Storage(std::io::Error::other(format!(
3931                "task list sync start failed: {}",
3932                e
3933            )))
3934        })?;
3935
3936        Ok(TaskListHandle {
3937            sync,
3938            agent_id: self.agent_id(),
3939            peer_id,
3940        })
3941    }
3942
3943    /// Join an existing task list by topic.
3944    ///
3945    /// Connects to a task list that was created by another agent on the
3946    /// specified topic. The local replica will sync with peers automatically.
3947    ///
3948    /// # Arguments
3949    ///
3950    /// * `topic` - Gossip topic for the task list
3951    ///
3952    /// # Returns
3953    ///
3954    /// A `TaskListHandle` for interacting with the task list.
3955    ///
3956    /// # Errors
3957    ///
3958    /// Returns an error if the gossip runtime is not initialized.
3959    ///
3960    /// # Example
3961    ///
3962    /// ```ignore
3963    /// let list = agent.join_task_list("team-sprint").await?;
3964    /// ```
3965    pub async fn join_task_list(&self, topic: &str) -> error::Result<TaskListHandle> {
3966        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
3967            error::IdentityError::Storage(std::io::Error::other(
3968                "gossip runtime not initialized - configure agent with network first",
3969            ))
3970        })?;
3971
3972        let peer_id = runtime.peer_id();
3973        // Create empty task list; it will be populated via delta sync
3974        let list_id = crdt::TaskListId::from_content(topic, &self.agent_id(), 0);
3975        let task_list = crdt::TaskList::new(list_id, String::new(), peer_id);
3976
3977        let sync = crdt::TaskListSync::new(
3978            task_list,
3979            std::sync::Arc::clone(runtime.pubsub()),
3980            topic.to_string(),
3981            30,
3982        )
3983        .map_err(|e| {
3984            error::IdentityError::Storage(std::io::Error::other(format!(
3985                "task list sync creation failed: {}",
3986                e
3987            )))
3988        })?;
3989
3990        let sync = std::sync::Arc::new(sync);
3991        sync.start().await.map_err(|e| {
3992            error::IdentityError::Storage(std::io::Error::other(format!(
3993                "task list sync start failed: {}",
3994                e
3995            )))
3996        })?;
3997
3998        Ok(TaskListHandle {
3999            sync,
4000            agent_id: self.agent_id(),
4001            peer_id,
4002        })
4003    }
4004}
4005
4006impl AgentBuilder {
4007    /// Set a custom path for the machine keypair.
4008    ///
4009    /// If not set, the machine keypair is stored in `~/.x0x/machine.key`.
4010    ///
4011    /// # Arguments
4012    ///
4013    /// * `path` - The path to store the machine keypair.
4014    ///
4015    /// # Returns
4016    ///
4017    /// Self for chaining.
4018    pub fn with_machine_key<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
4019        self.machine_key_path = Some(path.as_ref().to_path_buf());
4020        self
4021    }
4022
4023    /// Import an agent keypair.
4024    ///
4025    /// If not set, the agent keypair is loaded from storage (or generated fresh
4026    /// if no stored key exists).
4027    ///
4028    /// This enables running the same agent on multiple machines by importing
4029    /// the same agent keypair (but with different machine keypairs).
4030    ///
4031    /// Note: When an explicit keypair is provided via this method, it takes
4032    /// precedence over `with_agent_key_path()`.
4033    ///
4034    /// # Arguments
4035    ///
4036    /// * `keypair` - The agent keypair to import.
4037    ///
4038    /// # Returns
4039    ///
4040    /// Self for chaining.
4041    pub fn with_agent_key(mut self, keypair: identity::AgentKeypair) -> Self {
4042        self.agent_keypair = Some(keypair);
4043        self
4044    }
4045
4046    /// Set a custom path for the agent keypair.
4047    ///
4048    /// If not set, the agent keypair is stored in `~/.x0x/agent.key`.
4049    /// If no stored key is found at the path, a fresh one is generated and saved.
4050    ///
4051    /// This is ignored when `with_agent_key()` provides an explicit keypair.
4052    ///
4053    /// # Arguments
4054    ///
4055    /// * `path` - The path to store/load the agent keypair.
4056    ///
4057    /// # Returns
4058    ///
4059    /// Self for chaining.
4060    pub fn with_agent_key_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
4061        self.agent_key_path = Some(path.as_ref().to_path_buf());
4062        self
4063    }
4064
4065    /// Set a custom path for the agent certificate (`agent.cert`).
4066    ///
4067    /// Required for multi-daemon setups that share a host — the default
4068    /// path (`~/.x0x/agent.cert`) is shared across daemons, causing
4069    /// last-writer-wins trampling that makes peers reject identity
4070    /// announcements as `agent certificate agent_id mismatch`.
4071    ///
4072    /// # Arguments
4073    ///
4074    /// * `path` - The path to use for the agent certificate file.
4075    ///
4076    /// # Returns
4077    ///
4078    /// Self for chaining.
4079    #[must_use]
4080    pub fn with_agent_cert_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
4081        self.agent_cert_path = Some(path.as_ref().to_path_buf());
4082        self
4083    }
4084
4085    /// Set network configuration for P2P communication.
4086    ///
4087    /// If not set, default network configuration is used.
4088    ///
4089    /// # Arguments
4090    ///
4091    /// * `config` - The network configuration to use.
4092    ///
4093    /// # Returns
4094    ///
4095    /// Self for chaining.
4096    pub fn with_network_config(mut self, config: network::NetworkConfig) -> Self {
4097        self.network_config = Some(config);
4098        self
4099    }
4100
4101    /// Set the directory for the bootstrap peer cache.
4102    ///
4103    /// The cache persists peer quality metrics across restarts, enabling
4104    /// cache-first join strategy. Defaults to `~/.x0x/peers/` if not set.
4105    /// Falls back to `./.x0x/peers/` (relative to CWD) if `$HOME` is unset.
4106    pub fn with_peer_cache_dir<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
4107        self.peer_cache_dir = Some(path.as_ref().to_path_buf());
4108        self
4109    }
4110
4111    /// Disable the bootstrap peer cache entirely.
4112    ///
4113    /// When set, the agent will not open or load any cached peers on
4114    /// startup. This ensures complete network isolation from previously
4115    /// seen peers for embedders and dedicated test harnesses.
4116    ///
4117    /// Note: the x0xd daemon's `--no-hard-coded-bootstrap` flag does
4118    /// not call this; it only clears configured seed peers.
4119    pub fn with_peer_cache_disabled(mut self) -> Self {
4120        self.disable_peer_cache = true;
4121        self
4122    }
4123
4124    /// Import a user keypair for three-layer identity.
4125    ///
4126    /// This binds a human identity to this agent. When provided, an
4127    /// [`identity::AgentCertificate`] is automatically issued (if one
4128    /// doesn't already exist in storage) to cryptographically attest
4129    /// that this agent belongs to the user.
4130    ///
4131    /// Note: When an explicit keypair is provided via this method, it takes
4132    /// precedence over `with_user_key_path()`.
4133    ///
4134    /// # Arguments
4135    ///
4136    /// * `keypair` - The user keypair to import.
4137    ///
4138    /// # Returns
4139    ///
4140    /// Self for chaining.
4141    pub fn with_user_key(mut self, keypair: identity::UserKeypair) -> Self {
4142        self.user_keypair = Some(keypair);
4143        self
4144    }
4145
4146    /// Set a custom path for the user keypair.
4147    ///
4148    /// Unlike machine and agent keys, user keys are **not** auto-generated.
4149    /// If the file at this path doesn't exist, no user identity is set
4150    /// (the agent operates with two-layer identity).
4151    ///
4152    /// This is ignored when `with_user_key()` provides an explicit keypair.
4153    ///
4154    /// # Arguments
4155    ///
4156    /// * `path` - The path to load the user keypair from.
4157    ///
4158    /// # Returns
4159    ///
4160    /// Self for chaining.
4161    pub fn with_user_key_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
4162        self.user_key_path = Some(path.as_ref().to_path_buf());
4163        self
4164    }
4165
4166    /// Set the identity heartbeat re-announcement interval.
4167    ///
4168    /// Defaults to [`IDENTITY_HEARTBEAT_INTERVAL_SECS`] (300 seconds).
4169    ///
4170    /// # Arguments
4171    ///
4172    /// * `secs` - Interval in seconds between identity re-announcements.
4173    #[must_use]
4174    pub fn with_heartbeat_interval(mut self, secs: u64) -> Self {
4175        self.heartbeat_interval_secs = Some(secs);
4176        self
4177    }
4178
4179    /// Set the identity cache TTL.
4180    ///
4181    /// Cache entries with `last_seen` older than this threshold are filtered
4182    /// from [`Agent::presence`] and [`Agent::discovered_agents`].
4183    ///
4184    /// Defaults to [`IDENTITY_TTL_SECS`] (900 seconds).
4185    ///
4186    /// # Arguments
4187    ///
4188    /// * `secs` - Time-to-live in seconds for discovered agent entries.
4189    #[must_use]
4190    pub fn with_identity_ttl(mut self, secs: u64) -> Self {
4191        self.identity_ttl_secs = Some(secs);
4192        self
4193    }
4194
4195    /// Override the presence beacon broadcast interval in seconds.
4196    #[must_use]
4197    pub fn with_presence_beacon_interval(mut self, secs: u64) -> Self {
4198        self.presence_beacon_interval_secs = Some(secs);
4199        self
4200    }
4201
4202    /// Override the presence event poll interval in seconds.
4203    #[must_use]
4204    pub fn with_presence_event_poll_interval(mut self, secs: u64) -> Self {
4205        self.presence_event_poll_interval_secs = Some(secs);
4206        self
4207    }
4208
4209    /// Override the fallback offline timeout used by presence events.
4210    #[must_use]
4211    pub fn with_presence_offline_timeout(mut self, secs: u64) -> Self {
4212        self.presence_offline_timeout_secs = Some(secs);
4213        self
4214    }
4215
4216    /// Set a custom path for the contacts file.
4217    ///
4218    /// The contacts file persists trust levels and machine records for known
4219    /// agents. Defaults to `~/.x0x/contacts.json` if not set.
4220    ///
4221    /// # Arguments
4222    ///
4223    /// * `path` - The path for the contacts file.
4224    #[must_use]
4225    pub fn with_contact_store_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
4226        self.contact_store_path = Some(path.as_ref().to_path_buf());
4227        self
4228    }
4229
4230    /// Build and initialise the agent.
4231    ///
4232    /// This performs the following:
4233    /// 1. Loads or generates the machine keypair (stored in `~/.x0x/machine.key` by default)
4234    /// 2. Uses provided agent keypair or generates a fresh one
4235    /// 3. Combines both into a unified Identity
4236    ///
4237    /// The machine keypair is automatically persisted to storage.
4238    ///
4239    /// # Errors
4240    ///
4241    /// Returns an error if:
4242    /// - Machine keypair generation fails
4243    /// - Storage I/O fails
4244    /// - Keypair deserialization fails
4245    pub async fn build(self) -> error::Result<Agent> {
4246        // Determine machine keypair source
4247        let machine_keypair = if let Some(path) = self.machine_key_path {
4248            // Try to load from custom path
4249            match storage::load_machine_keypair_from(&path).await {
4250                Ok(kp) => kp,
4251                Err(_) => {
4252                    // Generate fresh keypair and save to custom path
4253                    let kp = identity::MachineKeypair::generate()?;
4254                    storage::save_machine_keypair_to(&kp, &path).await?;
4255                    kp
4256                }
4257            }
4258        } else if storage::machine_keypair_exists().await {
4259            // Load default machine keypair
4260            storage::load_machine_keypair().await?
4261        } else {
4262            // Generate and save default machine keypair
4263            let kp = identity::MachineKeypair::generate()?;
4264            storage::save_machine_keypair(&kp).await?;
4265            kp
4266        };
4267
4268        // Resolve agent keypair: explicit > path-based > default storage > generate
4269        let agent_keypair = if let Some(kp) = self.agent_keypair {
4270            // Explicit keypair takes highest precedence
4271            kp
4272        } else if let Some(path) = self.agent_key_path {
4273            // Custom path: load or generate+save
4274            match storage::load_agent_keypair_from(&path).await {
4275                Ok(kp) => kp,
4276                Err(_) => {
4277                    let kp = identity::AgentKeypair::generate()?;
4278                    storage::save_agent_keypair_to(&kp, &path).await?;
4279                    kp
4280                }
4281            }
4282        } else if storage::agent_keypair_exists().await {
4283            // Default path exists: load it
4284            storage::load_agent_keypair_default().await?
4285        } else {
4286            // No stored key: generate and persist
4287            let kp = identity::AgentKeypair::generate()?;
4288            storage::save_agent_keypair_default(&kp).await?;
4289            kp
4290        };
4291
4292        // Resolve user keypair: explicit > path-based > default storage > None (opt-in)
4293        let user_keypair = if let Some(kp) = self.user_keypair {
4294            Some(kp)
4295        } else if let Some(path) = self.user_key_path {
4296            // Custom path: load if exists, otherwise None (don't auto-generate)
4297            storage::load_user_keypair_from(&path).await.ok()
4298        } else if storage::user_keypair_exists().await {
4299            // Default path exists: load it
4300            storage::load_user_keypair().await.ok()
4301        } else {
4302            None
4303        };
4304
4305        // Build identity with optional user layer.
4306        //
4307        // The agent certificate binds (user_id, agent_id). On load we must
4308        // verify BOTH halves of the binding match the current identity —
4309        // user_id AND agent_id — and re-issue if either diverges. A mismatch
4310        // happens in two practical scenarios:
4311        //   1. The user key was replaced (cert's user_id no longer ours).
4312        //   2. Multi-daemon-per-host setups where the cert path is shared
4313        //      and a peer daemon overwrote it with their own cert (cert's
4314        //      agent_id no longer ours).
4315        // Without the agent_id half of the check, scenario (2) produces an
4316        // announcement whose cert binds another daemon's agent_id, and peers
4317        // reject it as "agent certificate agent_id mismatch".
4318        //
4319        // The per-daemon `agent_cert_path` (set by `with_agent_cert_path()`)
4320        // is the structural fix for scenario (2); the agent_id check is the
4321        // defensive net in case two processes still land on the same path.
4322        let identity = if let Some(user_kp) = user_keypair {
4323            let cert_path = self.agent_cert_path.clone();
4324            let existing_cert = if let Some(ref p) = cert_path {
4325                if tokio::fs::try_exists(p).await.unwrap_or(false) {
4326                    storage::load_agent_certificate_from(p).await.ok()
4327                } else {
4328                    None
4329                }
4330            } else if storage::agent_certificate_exists().await {
4331                storage::load_agent_certificate().await.ok()
4332            } else {
4333                None
4334            };
4335
4336            let cert_still_valid = existing_cert.as_ref().is_some_and(|c| {
4337                let user_match = c
4338                    .user_id()
4339                    .map(|uid| uid == user_kp.user_id())
4340                    .unwrap_or(false);
4341                let agent_match = c
4342                    .agent_id()
4343                    .map(|aid| aid == agent_keypair.agent_id())
4344                    .unwrap_or(false);
4345                user_match && agent_match
4346            });
4347
4348            let cert = if cert_still_valid {
4349                existing_cert.expect("cert_still_valid implies Some")
4350            } else {
4351                let new_cert = identity::AgentCertificate::issue(&user_kp, &agent_keypair)?;
4352                if let Some(ref p) = cert_path {
4353                    storage::save_agent_certificate_to(&new_cert, p).await?;
4354                } else {
4355                    storage::save_agent_certificate(&new_cert).await?;
4356                }
4357                new_cert
4358            };
4359            identity::Identity::new_with_user(machine_keypair, agent_keypair, user_kp, cert)
4360        } else {
4361            identity::Identity::new(machine_keypair, agent_keypair)
4362        };
4363
4364        // Open bootstrap peer cache if network will be configured
4365        // and the cache is not explicitly disabled by the caller.
4366        let bootstrap_cache = if self.network_config.is_some() && !self.disable_peer_cache {
4367            let cache_dir = self.peer_cache_dir.unwrap_or_else(|| {
4368                dirs::home_dir()
4369                    .unwrap_or_else(|| std::path::PathBuf::from("."))
4370                    .join(".x0x")
4371                    .join("peers")
4372            });
4373            let config = ant_quic::BootstrapCacheConfig::builder()
4374                .cache_dir(cache_dir)
4375                .min_peers_to_save(1)
4376                .build();
4377            match ant_quic::BootstrapCache::open(config).await {
4378                Ok(cache) => {
4379                    let cache = std::sync::Arc::new(cache);
4380                    std::sync::Arc::clone(&cache).start_maintenance();
4381                    Some(cache)
4382                }
4383                Err(e) => {
4384                    tracing::warn!("Failed to open bootstrap cache: {e}");
4385                    None
4386                }
4387            }
4388        } else {
4389            None
4390        };
4391
4392        // Create network node if configured
4393        // Pass the machine keypair so ant-quic PeerId == MachineId (identity unification)
4394        let machine_keypair = {
4395            let pk = ant_quic::MlDsaPublicKey::from_bytes(
4396                identity.machine_keypair().public_key().as_bytes(),
4397            )
4398            .map_err(|e| {
4399                error::IdentityError::Storage(std::io::Error::other(format!(
4400                    "invalid machine public key: {e}"
4401                )))
4402            })?;
4403            let sk = ant_quic::MlDsaSecretKey::from_bytes(
4404                identity.machine_keypair().secret_key().as_bytes(),
4405            )
4406            .map_err(|e| {
4407                error::IdentityError::Storage(std::io::Error::other(format!(
4408                    "invalid machine secret key: {e}"
4409                )))
4410            })?;
4411            Some((pk, sk))
4412        };
4413
4414        let network = if let Some(config) = self.network_config {
4415            let node = network::NetworkNode::new(config, bootstrap_cache.clone(), machine_keypair)
4416                .await
4417                .map_err(|e| {
4418                    error::IdentityError::Storage(std::io::Error::other(format!(
4419                        "network initialization failed: {}",
4420                        e
4421                    )))
4422                })?;
4423
4424            // Verify identity unification: ant-quic PeerId must equal MachineId
4425            debug_assert_eq!(
4426                node.peer_id().0,
4427                identity.machine_id().0,
4428                "ant-quic PeerId must equal MachineId after identity unification"
4429            );
4430
4431            Some(std::sync::Arc::new(node))
4432        } else {
4433            None
4434        };
4435
4436        // Create signing context from agent keypair for message authentication
4437        let signing_ctx = std::sync::Arc::new(gossip::SigningContext::from_keypair(
4438            identity.agent_keypair(),
4439        ));
4440
4441        // Create gossip runtime if network exists
4442        let gossip_runtime = if let Some(ref net) = network {
4443            let runtime = gossip::GossipRuntime::new(
4444                gossip::GossipConfig::default(),
4445                std::sync::Arc::clone(net),
4446                Some(signing_ctx),
4447            )
4448            .await
4449            .map_err(|e| {
4450                error::IdentityError::Storage(std::io::Error::other(format!(
4451                    "gossip runtime initialization failed: {}",
4452                    e
4453                )))
4454            })?;
4455            Some(std::sync::Arc::new(runtime))
4456        } else {
4457            None
4458        };
4459
4460        // Initialise contact store
4461        let contacts_path = self.contact_store_path.unwrap_or_else(|| {
4462            dirs::home_dir()
4463                .unwrap_or_else(|| std::path::PathBuf::from("."))
4464                .join(".x0x")
4465                .join("contacts.json")
4466        });
4467        let contact_store = std::sync::Arc::new(tokio::sync::RwLock::new(
4468            contacts::ContactStore::new(contacts_path),
4469        ));
4470
4471        // Wrap bootstrap cache with gossip coordinator adapter (zero duplication).
4472        let gossip_cache_adapter = bootstrap_cache.as_ref().map(|cache| {
4473            saorsa_gossip_coordinator::GossipCacheAdapter::new(std::sync::Arc::clone(cache))
4474        });
4475
4476        // Initialize direct messaging infrastructure
4477        let direct_messaging = std::sync::Arc::new(direct::DirectMessaging::new());
4478
4479        // Create presence wrapper if network exists
4480        let presence = if let Some(ref net) = network {
4481            let peer_id = saorsa_gossip_transport::GossipTransport::local_peer_id(net.as_ref());
4482            let mut presence_config = presence::PresenceConfig::default();
4483            if let Some(secs) = self.presence_beacon_interval_secs {
4484                presence_config.beacon_interval_secs = secs;
4485            }
4486            if let Some(secs) = self.presence_event_poll_interval_secs {
4487                presence_config.event_poll_interval_secs = secs;
4488            }
4489            if let Some(secs) = self.presence_offline_timeout_secs {
4490                presence_config.adaptive_timeout_fallback_secs = secs;
4491            }
4492            let pw = presence::PresenceWrapper::new(
4493                peer_id,
4494                std::sync::Arc::clone(net),
4495                presence_config,
4496                bootstrap_cache.clone(),
4497            )
4498            .map_err(|e| {
4499                error::IdentityError::Storage(std::io::Error::other(format!(
4500                    "presence initialization failed: {}",
4501                    e
4502                )))
4503            })?;
4504            let pw_arc = std::sync::Arc::new(pw);
4505            // Wire presence into gossip runtime for Bulk dispatch
4506            if let Some(ref rt) = gossip_runtime {
4507                rt.set_presence(std::sync::Arc::clone(&pw_arc));
4508            }
4509            Some(pw_arc)
4510        } else {
4511            None
4512        };
4513
4514        Ok(Agent {
4515            identity: std::sync::Arc::new(identity),
4516            network,
4517            gossip_runtime,
4518            bootstrap_cache,
4519            gossip_cache_adapter,
4520            identity_discovery_cache: std::sync::Arc::new(tokio::sync::RwLock::new(
4521                std::collections::HashMap::new(),
4522            )),
4523            identity_listener_started: std::sync::atomic::AtomicBool::new(false),
4524            heartbeat_interval_secs: self
4525                .heartbeat_interval_secs
4526                .unwrap_or(IDENTITY_HEARTBEAT_INTERVAL_SECS),
4527            identity_ttl_secs: self.identity_ttl_secs.unwrap_or(IDENTITY_TTL_SECS),
4528            heartbeat_handle: tokio::sync::Mutex::new(None),
4529            rendezvous_advertised: std::sync::atomic::AtomicBool::new(false),
4530            contact_store,
4531            direct_messaging,
4532            network_event_listener_started: std::sync::atomic::AtomicBool::new(false),
4533            direct_listener_started: std::sync::atomic::AtomicBool::new(false),
4534            presence,
4535            user_identity_consented: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
4536            capability_store: std::sync::Arc::new(dm_capability::CapabilityStore::new()),
4537            dm_capabilities_tx: std::sync::Arc::new({
4538                let (tx, _rx) = tokio::sync::watch::channel(dm::DmCapabilities::pending());
4539                tx
4540            }),
4541            dm_inflight_acks: std::sync::Arc::new(dm::InFlightAcks::new()),
4542            recent_delivery_cache: std::sync::Arc::new(dm::RecentDeliveryCache::with_defaults()),
4543            capability_advert_service: tokio::sync::Mutex::new(None),
4544            dm_inbox_service: tokio::sync::Mutex::new(None),
4545        })
4546    }
4547}
4548
4549/// Handle for interacting with a collaborative task list.
4550///
4551/// Provides a safe, concurrent interface to a TaskList backed by
4552/// CRDT synchronization. All operations are async and return Results.
4553///
4554/// # Example
4555///
4556/// ```ignore
4557/// let handle = agent.create_task_list("My List", "topic").await?;
4558/// let task_id = handle.add_task("Write docs".to_string(), "API docs".to_string()).await?;
4559/// handle.claim_task(task_id).await?;
4560/// handle.complete_task(task_id).await?;
4561/// ```
4562#[derive(Clone)]
4563pub struct TaskListHandle {
4564    sync: std::sync::Arc<crdt::TaskListSync>,
4565    agent_id: identity::AgentId,
4566    peer_id: saorsa_gossip_types::PeerId,
4567}
4568
4569impl std::fmt::Debug for TaskListHandle {
4570    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4571        f.debug_struct("TaskListHandle")
4572            .field("agent_id", &self.agent_id)
4573            .field("peer_id", &self.peer_id)
4574            .finish_non_exhaustive()
4575    }
4576}
4577
4578impl TaskListHandle {
4579    /// Add a new task to the list.
4580    ///
4581    /// # Arguments
4582    ///
4583    /// * `title` - Task title
4584    /// * `description` - Task description
4585    ///
4586    /// # Returns
4587    ///
4588    /// The TaskId of the created task.
4589    ///
4590    /// # Errors
4591    ///
4592    /// Returns an error if the task cannot be added.
4593    pub async fn add_task(
4594        &self,
4595        title: String,
4596        description: String,
4597    ) -> error::Result<crdt::TaskId> {
4598        let (task_id, delta) = {
4599            let mut list = self.sync.write().await;
4600            let seq = list.next_seq();
4601            let task_id = crdt::TaskId::new(&title, &self.agent_id, seq);
4602            let metadata = crdt::TaskMetadata::new(title, description, 128, self.agent_id, seq);
4603            let task = crdt::TaskItem::new(task_id, metadata, self.peer_id);
4604            list.add_task(task.clone(), self.peer_id, seq)
4605                .map_err(|e| {
4606                    error::IdentityError::Storage(std::io::Error::other(format!(
4607                        "add_task failed: {}",
4608                        e
4609                    )))
4610                })?;
4611            let tag = (self.peer_id, seq);
4612            let delta = crdt::TaskListDelta::for_add(task_id, task, tag, list.current_version());
4613            (task_id, delta)
4614        };
4615        // Best-effort replication: local mutation succeeded regardless
4616        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
4617            tracing::warn!("failed to publish add_task delta: {}", e);
4618        }
4619        Ok(task_id)
4620    }
4621
4622    /// Claim a task in the list.
4623    ///
4624    /// # Arguments
4625    ///
4626    /// * `task_id` - ID of the task to claim
4627    ///
4628    /// # Errors
4629    ///
4630    /// Returns an error if the task cannot be claimed.
4631    pub async fn claim_task(&self, task_id: crdt::TaskId) -> error::Result<()> {
4632        let delta = {
4633            let mut list = self.sync.write().await;
4634            let seq = list.next_seq();
4635            list.claim_task(&task_id, self.agent_id, self.peer_id, seq)
4636                .map_err(|e| {
4637                    error::IdentityError::Storage(std::io::Error::other(format!(
4638                        "claim_task failed: {}",
4639                        e
4640                    )))
4641                })?;
4642            // Include full task so receivers can upsert if add hasn't arrived yet
4643            let full_task = list
4644                .get_task(&task_id)
4645                .ok_or_else(|| {
4646                    error::IdentityError::Storage(std::io::Error::other(
4647                        "task disappeared after claim",
4648                    ))
4649                })?
4650                .clone();
4651            crdt::TaskListDelta::for_state_change(task_id, full_task, list.current_version())
4652        };
4653        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
4654            tracing::warn!("failed to publish claim_task delta: {}", e);
4655        }
4656        Ok(())
4657    }
4658
4659    /// Complete a task in the list.
4660    ///
4661    /// # Arguments
4662    ///
4663    /// * `task_id` - ID of the task to complete
4664    ///
4665    /// # Errors
4666    ///
4667    /// Returns an error if the task cannot be completed.
4668    pub async fn complete_task(&self, task_id: crdt::TaskId) -> error::Result<()> {
4669        let delta = {
4670            let mut list = self.sync.write().await;
4671            let seq = list.next_seq();
4672            list.complete_task(&task_id, self.agent_id, self.peer_id, seq)
4673                .map_err(|e| {
4674                    error::IdentityError::Storage(std::io::Error::other(format!(
4675                        "complete_task failed: {}",
4676                        e
4677                    )))
4678                })?;
4679            let full_task = list
4680                .get_task(&task_id)
4681                .ok_or_else(|| {
4682                    error::IdentityError::Storage(std::io::Error::other(
4683                        "task disappeared after complete",
4684                    ))
4685                })?
4686                .clone();
4687            crdt::TaskListDelta::for_state_change(task_id, full_task, list.current_version())
4688        };
4689        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
4690            tracing::warn!("failed to publish complete_task delta: {}", e);
4691        }
4692        Ok(())
4693    }
4694
4695    /// List all tasks in their current order.
4696    ///
4697    /// # Returns
4698    ///
4699    /// A vector of `TaskSnapshot` representing the current state.
4700    ///
4701    /// # Errors
4702    ///
4703    /// Returns an error if the task list cannot be read.
4704    pub async fn list_tasks(&self) -> error::Result<Vec<TaskSnapshot>> {
4705        let list = self.sync.read().await;
4706        let tasks = list.tasks_ordered();
4707        Ok(tasks
4708            .into_iter()
4709            .map(|task| TaskSnapshot {
4710                id: *task.id(),
4711                title: task.title().to_string(),
4712                description: task.description().to_string(),
4713                state: task.current_state(),
4714                assignee: task.assignee().copied(),
4715                owner: None,
4716                priority: task.priority(),
4717            })
4718            .collect())
4719    }
4720
4721    /// Reorder tasks in the list.
4722    ///
4723    /// # Arguments
4724    ///
4725    /// * `task_ids` - New ordering of task IDs
4726    ///
4727    /// # Errors
4728    ///
4729    /// Returns an error if reordering fails.
4730    pub async fn reorder(&self, task_ids: Vec<crdt::TaskId>) -> error::Result<()> {
4731        let delta = {
4732            let mut list = self.sync.write().await;
4733            list.reorder(task_ids.clone(), self.peer_id).map_err(|e| {
4734                error::IdentityError::Storage(std::io::Error::other(format!(
4735                    "reorder failed: {}",
4736                    e
4737                )))
4738            })?;
4739            crdt::TaskListDelta::for_reorder(task_ids, list.current_version())
4740        };
4741        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
4742            tracing::warn!("failed to publish reorder delta: {}", e);
4743        }
4744        Ok(())
4745    }
4746}
4747
4748// ---------------------------------------------------------------------------
4749// KvStore API
4750// ---------------------------------------------------------------------------
4751
4752impl Agent {
4753    /// Create a new key-value store.
4754    ///
4755    /// The store is automatically synchronized to all peers subscribed
4756    /// to the same `topic` via gossip delta propagation.
4757    ///
4758    /// # Errors
4759    ///
4760    /// Returns an error if the gossip runtime is not initialized.
4761    pub async fn create_kv_store(&self, name: &str, topic: &str) -> error::Result<KvStoreHandle> {
4762        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
4763            error::IdentityError::Storage(std::io::Error::other(
4764                "gossip runtime not initialized - configure agent with network first",
4765            ))
4766        })?;
4767
4768        let peer_id = runtime.peer_id();
4769        let store_id = kv::KvStoreId::from_content(name, &self.agent_id());
4770        let store = kv::KvStore::new(
4771            store_id,
4772            name.to_string(),
4773            self.agent_id(),
4774            kv::AccessPolicy::Signed,
4775        );
4776
4777        let sync = kv::KvStoreSync::new(
4778            store,
4779            std::sync::Arc::clone(runtime.pubsub()),
4780            topic.to_string(),
4781            30,
4782        )
4783        .map_err(|e| {
4784            error::IdentityError::Storage(std::io::Error::other(format!(
4785                "kv store sync creation failed: {e}",
4786            )))
4787        })?;
4788
4789        let sync = std::sync::Arc::new(sync);
4790        sync.start().await.map_err(|e| {
4791            error::IdentityError::Storage(std::io::Error::other(format!(
4792                "kv store sync start failed: {e}",
4793            )))
4794        })?;
4795
4796        Ok(KvStoreHandle {
4797            sync,
4798            agent_id: self.agent_id(),
4799            peer_id,
4800        })
4801    }
4802
4803    /// Join an existing key-value store by topic.
4804    ///
4805    /// Creates an empty store that will be populated via delta sync
4806    /// from peers already sharing the topic. The access policy will
4807    /// be learned from the first full delta received from the owner.
4808    ///
4809    /// # Errors
4810    ///
4811    /// Returns an error if the gossip runtime is not initialized.
4812    pub async fn join_kv_store(&self, topic: &str) -> error::Result<KvStoreHandle> {
4813        let runtime = self.gossip_runtime.as_ref().ok_or_else(|| {
4814            error::IdentityError::Storage(std::io::Error::other(
4815                "gossip runtime not initialized - configure agent with network first",
4816            ))
4817        })?;
4818
4819        let peer_id = runtime.peer_id();
4820        let store_id = kv::KvStoreId::from_content(topic, &self.agent_id());
4821        // Use Encrypted as the most permissive default — the actual policy
4822        // will be set when the first delta from the owner arrives.
4823        let store = kv::KvStore::new(
4824            store_id,
4825            String::new(),
4826            self.agent_id(),
4827            kv::AccessPolicy::Encrypted {
4828                group_id: Vec::new(),
4829            },
4830        );
4831
4832        let sync = kv::KvStoreSync::new(
4833            store,
4834            std::sync::Arc::clone(runtime.pubsub()),
4835            topic.to_string(),
4836            30,
4837        )
4838        .map_err(|e| {
4839            error::IdentityError::Storage(std::io::Error::other(format!(
4840                "kv store sync creation failed: {e}",
4841            )))
4842        })?;
4843
4844        let sync = std::sync::Arc::new(sync);
4845        sync.start().await.map_err(|e| {
4846            error::IdentityError::Storage(std::io::Error::other(format!(
4847                "kv store sync start failed: {e}",
4848            )))
4849        })?;
4850
4851        Ok(KvStoreHandle {
4852            sync,
4853            agent_id: self.agent_id(),
4854            peer_id,
4855        })
4856    }
4857}
4858
4859/// Handle for interacting with a replicated key-value store.
4860///
4861/// Provides async methods for putting, getting, and removing entries.
4862/// Changes are automatically replicated to peers via gossip.
4863#[derive(Clone)]
4864pub struct KvStoreHandle {
4865    sync: std::sync::Arc<kv::KvStoreSync>,
4866    agent_id: identity::AgentId,
4867    peer_id: saorsa_gossip_types::PeerId,
4868}
4869
4870impl std::fmt::Debug for KvStoreHandle {
4871    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4872        f.debug_struct("KvStoreHandle")
4873            .field("agent_id", &self.agent_id)
4874            .field("peer_id", &self.peer_id)
4875            .finish_non_exhaustive()
4876    }
4877}
4878
4879impl KvStoreHandle {
4880    /// Put a key-value pair into the store.
4881    ///
4882    /// If the key already exists, the value is updated. Changes are
4883    /// automatically replicated to peers via gossip.
4884    ///
4885    /// # Errors
4886    ///
4887    /// Returns an error if the value exceeds the maximum inline size (64 KB).
4888    pub async fn put(
4889        &self,
4890        key: String,
4891        value: Vec<u8>,
4892        content_type: String,
4893    ) -> error::Result<()> {
4894        let delta = {
4895            let mut store = self.sync.write().await;
4896            store
4897                .put(
4898                    key.clone(),
4899                    value.clone(),
4900                    content_type.clone(),
4901                    self.peer_id,
4902                )
4903                .map_err(|e| {
4904                    error::IdentityError::Storage(std::io::Error::other(format!(
4905                        "kv put failed: {e}",
4906                    )))
4907                })?;
4908            let entry = store.get(&key).cloned();
4909            let version = store.current_version();
4910            match entry {
4911                Some(e) => {
4912                    kv::KvStoreDelta::for_put(key, e, (self.peer_id, store.next_seq()), version)
4913                }
4914                None => return Ok(()), // shouldn't happen after successful put
4915            }
4916        };
4917        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
4918            tracing::warn!("failed to publish kv put delta: {e}");
4919        }
4920        Ok(())
4921    }
4922
4923    /// Get a value by key.
4924    ///
4925    /// Returns `None` if the key does not exist or has been removed.
4926    ///
4927    /// # Errors
4928    ///
4929    /// Returns an error if the store cannot be read.
4930    pub async fn get(&self, key: &str) -> error::Result<Option<KvEntrySnapshot>> {
4931        let store = self.sync.read().await;
4932        Ok(store.get(key).map(|e| KvEntrySnapshot {
4933            key: e.key.clone(),
4934            value: e.value.clone(),
4935            content_hash: hex::encode(e.content_hash),
4936            content_type: e.content_type.clone(),
4937            metadata: e.metadata.clone(),
4938            created_at: e.created_at,
4939            updated_at: e.updated_at,
4940        }))
4941    }
4942
4943    /// Remove a key from the store.
4944    ///
4945    /// # Errors
4946    ///
4947    /// Returns an error if the key does not exist.
4948    pub async fn remove(&self, key: &str) -> error::Result<()> {
4949        let delta = {
4950            let mut store = self.sync.write().await;
4951            store.remove(key).map_err(|e| {
4952                error::IdentityError::Storage(std::io::Error::other(format!(
4953                    "kv remove failed: {e}",
4954                )))
4955            })?;
4956            let mut d = kv::KvStoreDelta::new(store.current_version());
4957            d.removed
4958                .insert(key.to_string(), std::collections::HashSet::new());
4959            d
4960        };
4961        if let Err(e) = self.sync.publish_delta(self.peer_id, delta).await {
4962            tracing::warn!("failed to publish kv remove delta: {e}");
4963        }
4964        Ok(())
4965    }
4966
4967    /// List all active keys in the store.
4968    ///
4969    /// # Errors
4970    ///
4971    /// Returns an error if the store cannot be read.
4972    pub async fn keys(&self) -> error::Result<Vec<KvEntrySnapshot>> {
4973        let store = self.sync.read().await;
4974        Ok(store
4975            .active_entries()
4976            .into_iter()
4977            .map(|e| KvEntrySnapshot {
4978                key: e.key.clone(),
4979                value: e.value.clone(),
4980                content_hash: hex::encode(e.content_hash),
4981                content_type: e.content_type.clone(),
4982                metadata: e.metadata.clone(),
4983                created_at: e.created_at,
4984                updated_at: e.updated_at,
4985            })
4986            .collect())
4987    }
4988
4989    /// Get the store name.
4990    ///
4991    /// # Errors
4992    ///
4993    /// Returns an error if the store cannot be read.
4994    pub async fn name(&self) -> error::Result<String> {
4995        let store = self.sync.read().await;
4996        Ok(store.name().to_string())
4997    }
4998}
4999
5000/// Read-only snapshot of a KvStore entry.
5001#[derive(Debug, Clone, serde::Serialize)]
5002pub struct KvEntrySnapshot {
5003    /// The key.
5004    pub key: String,
5005    /// The value bytes.
5006    pub value: Vec<u8>,
5007    /// BLAKE3 hash of the value (hex-encoded).
5008    pub content_hash: String,
5009    /// Content type (MIME).
5010    pub content_type: String,
5011    /// User metadata.
5012    pub metadata: std::collections::HashMap<String, String>,
5013    /// Unix milliseconds when created.
5014    pub created_at: u64,
5015    /// Unix milliseconds when last updated.
5016    pub updated_at: u64,
5017}
5018
5019/// Read-only snapshot of a task's current state.
5020///
5021/// This is returned by `TaskListHandle::list_tasks()` and hides CRDT
5022/// internals, providing a clean API surface.
5023#[derive(Debug, Clone)]
5024pub struct TaskSnapshot {
5025    /// Unique task identifier.
5026    pub id: crdt::TaskId,
5027    /// Task title.
5028    pub title: String,
5029    /// Task description.
5030    pub description: String,
5031    /// Current checkbox state (Empty, Claimed, or Done).
5032    pub state: crdt::CheckboxState,
5033    /// Agent assigned to this task (if any).
5034    pub assignee: Option<identity::AgentId>,
5035    /// Human owner of the agent that created this task (if known).
5036    pub owner: Option<identity::UserId>,
5037    /// Task priority (0-255, higher = more important).
5038    pub priority: u8,
5039}
5040
5041/// The x0x protocol version.
5042pub const VERSION: &str = env!("CARGO_PKG_VERSION");
5043
5044/// The name. Three bytes. A palindrome. A philosophy.
5045pub const NAME: &str = "x0x";
5046
5047#[cfg(test)]
5048mod tests {
5049    use super::*;
5050
5051    fn sa(s: &str) -> std::net::SocketAddr {
5052        s.parse().expect("valid SocketAddr literal in test")
5053    }
5054
5055    #[test]
5056    fn is_publicly_advertisable_rejects_lan_and_special_scopes() {
5057        // v4 non-global scopes
5058        assert!(
5059            !is_publicly_advertisable(sa("127.0.0.1:5483")),
5060            "loopback v4"
5061        );
5062        assert!(!is_publicly_advertisable(sa("10.1.2.3:5483")), "rfc1918 /8");
5063        assert!(
5064            !is_publicly_advertisable(sa("172.20.0.5:5483")),
5065            "rfc1918 /12"
5066        );
5067        assert!(
5068            !is_publicly_advertisable(sa("192.168.1.5:5483")),
5069            "rfc1918 /16"
5070        );
5071        assert!(
5072            !is_publicly_advertisable(sa("169.254.1.1:5483")),
5073            "link-local v4"
5074        );
5075        assert!(
5076            !is_publicly_advertisable(sa("100.64.1.1:5483")),
5077            "CGNAT (unreachable outside carrier)"
5078        );
5079        assert!(
5080            !is_publicly_advertisable(sa("0.0.0.0:5483")),
5081            "unspecified v4"
5082        );
5083
5084        // v6 non-global scopes
5085        assert!(!is_publicly_advertisable(sa("[::1]:5483")), "loopback v6");
5086        assert!(
5087            !is_publicly_advertisable(sa("[fe80::1]:5483")),
5088            "link-local v6"
5089        );
5090        assert!(!is_publicly_advertisable(sa("[fd00::1]:5483")), "ULA v6");
5091
5092        // port 0 never advertisable regardless of ip scope
5093        assert!(
5094            !is_publicly_advertisable(sa("1.2.3.4:0")),
5095            "port 0 on global v4"
5096        );
5097
5098        // Globally-routable positives
5099        assert!(is_publicly_advertisable(sa("1.2.3.4:5483")), "global v4");
5100        assert!(
5101            is_publicly_advertisable(sa("[2001:db8::1]:5483")),
5102            "global v6 (documentation doc but is_globally_routable permits)",
5103        );
5104        assert!(
5105            is_publicly_advertisable(sa("8.8.8.8:9000")),
5106            "global v4 on non-default port",
5107        );
5108
5109        // Reserved documentation ranges are correctly rejected by
5110        // is_globally_routable even though they are not RFC1918.
5111        assert!(
5112            !is_publicly_advertisable(sa("192.0.2.1:5483")),
5113            "TEST-NET-1 documentation range"
5114        );
5115        assert!(
5116            !is_publicly_advertisable(sa("203.0.113.10:5483")),
5117            "TEST-NET-3 documentation range"
5118        );
5119    }
5120
5121    #[test]
5122    fn presence_parse_addr_hints_drops_private_scopes() {
5123        // Older peers ship a mix of scopes. parse_addr_hints should return only
5124        // globally-advertisable entries so our dial loop never burns budget on
5125        // unreachable candidates.
5126        let hints = vec![
5127            "127.0.0.1:5483".to_string(),
5128            "10.200.0.1:5483".to_string(),
5129            "[fd00::1]:5483".to_string(),
5130            "1.2.3.4:5483".to_string(),
5131            "[2001:db8::1]:5483".to_string(),
5132            "not-an-address".to_string(),
5133        ];
5134        let parsed = presence::parse_addr_hints(&hints);
5135        let got: Vec<String> = parsed.iter().map(|a| a.to_string()).collect();
5136        assert_eq!(
5137            got,
5138            vec!["1.2.3.4:5483".to_string(), "[2001:db8::1]:5483".to_string()],
5139            "only globally-advertisable addresses survive inbound parsing"
5140        );
5141    }
5142
5143    #[test]
5144    fn name_is_palindrome() {
5145        let name = NAME;
5146        let reversed: String = name.chars().rev().collect();
5147        assert_eq!(name, reversed, "x0x must be a palindrome");
5148    }
5149
5150    #[test]
5151    fn name_is_three_bytes() {
5152        assert_eq!(NAME.len(), 3, "x0x must be exactly three bytes");
5153    }
5154
5155    #[test]
5156    fn name_is_ai_native() {
5157        // No uppercase, no spaces, no special chars that conflict
5158        // with shell, YAML, Markdown, or URL encoding
5159        assert!(NAME.chars().all(|c| c.is_ascii_alphanumeric()));
5160    }
5161
5162    #[tokio::test]
5163    async fn agent_creates() {
5164        let agent = Agent::new().await;
5165        assert!(agent.is_ok());
5166    }
5167
5168    #[tokio::test]
5169    async fn agent_joins_network() {
5170        let agent = Agent::new().await.unwrap();
5171        assert!(agent.join_network().await.is_ok());
5172    }
5173
5174    #[tokio::test]
5175    async fn agent_subscribes() {
5176        let agent = Agent::new().await.unwrap();
5177        // Currently returns error - will be implemented in Task 3
5178        assert!(agent.subscribe("test-topic").await.is_err());
5179    }
5180
5181    #[tokio::test]
5182    async fn identity_announcement_machine_signature_verifies() {
5183        let agent = Agent::builder()
5184            .with_network_config(network::NetworkConfig::default())
5185            .build()
5186            .await
5187            .unwrap();
5188
5189        let announcement = agent.build_identity_announcement(false, false).unwrap();
5190        assert_eq!(announcement.agent_id, agent.agent_id());
5191        assert_eq!(announcement.machine_id, agent.machine_id());
5192        assert!(announcement.user_id.is_none());
5193        assert!(announcement.agent_certificate.is_none());
5194        assert!(announcement.verify().is_ok());
5195    }
5196
5197    #[tokio::test]
5198    async fn identity_announcement_requires_human_consent() {
5199        let agent = Agent::builder()
5200            .with_network_config(network::NetworkConfig::default())
5201            .build()
5202            .await
5203            .unwrap();
5204
5205        let err = agent.build_identity_announcement(true, false).unwrap_err();
5206        assert!(
5207            err.to_string().contains("explicit human consent"),
5208            "unexpected error: {err}"
5209        );
5210    }
5211
5212    #[tokio::test]
5213    async fn identity_announcement_with_user_requires_user_identity() {
5214        let agent = Agent::builder()
5215            .with_network_config(network::NetworkConfig::default())
5216            .build()
5217            .await
5218            .unwrap();
5219
5220        let err = agent.build_identity_announcement(true, true).unwrap_err();
5221        assert!(
5222            err.to_string().contains("no user identity is configured"),
5223            "unexpected error: {err}"
5224        );
5225    }
5226
5227    #[tokio::test]
5228    async fn announce_identity_populates_discovery_cache() {
5229        let user_key = identity::UserKeypair::generate().unwrap();
5230        let agent = Agent::builder()
5231            .with_network_config(network::NetworkConfig::default())
5232            .with_user_key(user_key)
5233            .build()
5234            .await
5235            .unwrap();
5236
5237        agent.announce_identity(true, true).await.unwrap();
5238        let discovered = agent.discovered_agent(agent.agent_id()).await.unwrap();
5239        let entry = discovered.expect("agent should discover its own announcement");
5240
5241        assert_eq!(entry.agent_id, agent.agent_id());
5242        assert_eq!(entry.machine_id, agent.machine_id());
5243        assert_eq!(entry.user_id, agent.user_id());
5244    }
5245
5246    /// An announcement without NAT fields (as produced by old nodes) should still
5247    /// deserialise correctly via bincode — new fields are `Option` so `None` (0x00)
5248    /// is a valid encoding.
5249    #[test]
5250    fn identity_announcement_backward_compat_no_nat_fields() {
5251        use identity::{AgentId, MachineId};
5252
5253        // Build an announcement that omits the nat_* fields by serializing an old-style
5254        // struct that matches the pre-1.3 wire format.
5255        #[derive(serde::Serialize, serde::Deserialize)]
5256        struct OldIdentityAnnouncementUnsigned {
5257            agent_id: AgentId,
5258            machine_id: MachineId,
5259            user_id: Option<identity::UserId>,
5260            agent_certificate: Option<identity::AgentCertificate>,
5261            machine_public_key: Vec<u8>,
5262            addresses: Vec<std::net::SocketAddr>,
5263            announced_at: u64,
5264        }
5265
5266        let agent_id = AgentId([1u8; 32]);
5267        let machine_id = MachineId([2u8; 32]);
5268        let old = OldIdentityAnnouncementUnsigned {
5269            agent_id,
5270            machine_id,
5271            user_id: None,
5272            agent_certificate: None,
5273            machine_public_key: vec![0u8; 10],
5274            addresses: Vec::new(),
5275            announced_at: 1234,
5276        };
5277        let bytes = bincode::serialize(&old).expect("serialize old announcement");
5278
5279        // Attempt to deserialize as the new struct — this tests that the new fields
5280        // (which are Option<T>) do NOT break deserialization of the old format.
5281        // Note: bincode 1.x is not self-describing, so adding fields to a struct DOES
5282        // change the wire format.  This test documents the expected behavior.
5283        // Old format -> new struct: will fail because new struct has more fields.
5284        // New format -> old struct: will have trailing bytes.
5285        // This is acceptable — we document the protocol change.
5286        let result = bincode::deserialize::<IdentityAnnouncementUnsigned>(&bytes);
5287        // Old nodes produce shorter messages; new nodes cannot decode them as new structs.
5288        // This confirms the protocol is not transparent — nodes must upgrade together.
5289        assert!(
5290            result.is_err(),
5291            "Old-format announcement should not decode as new struct (protocol upgrade required)"
5292        );
5293    }
5294
5295    /// A new announcement with all NAT fields set round-trips through bincode.
5296    #[test]
5297    fn identity_announcement_nat_fields_round_trip() {
5298        use identity::{AgentId, MachineId};
5299
5300        let unsigned = IdentityAnnouncementUnsigned {
5301            agent_id: AgentId([1u8; 32]),
5302            machine_id: MachineId([2u8; 32]),
5303            user_id: None,
5304            agent_certificate: None,
5305            machine_public_key: vec![0u8; 10],
5306            addresses: Vec::new(),
5307            announced_at: 9999,
5308            nat_type: Some("FullCone".to_string()),
5309            can_receive_direct: Some(true),
5310            is_relay: Some(false),
5311            is_coordinator: Some(true),
5312        };
5313        let bytes = bincode::serialize(&unsigned).expect("serialize");
5314        let decoded: IdentityAnnouncementUnsigned =
5315            bincode::deserialize(&bytes).expect("deserialize");
5316        assert_eq!(decoded.nat_type.as_deref(), Some("FullCone"));
5317        assert_eq!(decoded.can_receive_direct, Some(true));
5318        assert_eq!(decoded.is_relay, Some(false));
5319        assert_eq!(decoded.is_coordinator, Some(true));
5320    }
5321
5322    /// An announcement with None for all NAT fields (e.g. network not started)
5323    /// round-trips correctly.
5324    #[test]
5325    fn identity_announcement_no_nat_fields_round_trip() {
5326        use identity::{AgentId, MachineId};
5327
5328        let unsigned = IdentityAnnouncementUnsigned {
5329            agent_id: AgentId([3u8; 32]),
5330            machine_id: MachineId([4u8; 32]),
5331            user_id: None,
5332            agent_certificate: None,
5333            machine_public_key: vec![0u8; 10],
5334            addresses: Vec::new(),
5335            announced_at: 42,
5336            nat_type: None,
5337            can_receive_direct: None,
5338            is_relay: None,
5339            is_coordinator: None,
5340        };
5341        let bytes = bincode::serialize(&unsigned).expect("serialize");
5342        let decoded: IdentityAnnouncementUnsigned =
5343            bincode::deserialize(&bytes).expect("deserialize");
5344        assert!(decoded.nat_type.is_none());
5345        assert!(decoded.can_receive_direct.is_none());
5346        assert!(decoded.is_relay.is_none());
5347        assert!(decoded.is_coordinator.is_none());
5348    }
5349}