Skip to main content

iicp_client/
node.rs

1// SPDX-License-Identifier: Apache-2.0
2//! IICP provider node — registration, heartbeats, and task serving.
3//!
4//! Implements:
5//! - `GET  /iicp/health`   — liveness / capacity (always 200)
6//! - `GET  /metrics`       — Prometheus text (503 if `metrics` feature absent)
7//! - `POST /v1/task`       — task handler with concurrency gate (IICP-E021),
8//!   nonce replay protection (IICP-E011), and W3C traceparent propagation.
9
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use axum::{
17    extract::State,
18    http::{HeaderMap, StatusCode},
19    response::{IntoResponse, Response},
20    routing::{get, post},
21    Json, Router,
22};
23use reqwest::Client;
24use serde::{Deserialize, Serialize};
25use serde_json::{json, Value};
26use tokio::net::TcpListener;
27use tokio::sync::Mutex;
28
29use crate::errors::{IicpError, Result};
30
31const DEFAULT_DIRECTORY: &str = "https://iicp.network/api";
32const HEARTBEAT_INTERVAL_SECS: u64 = 30;
33const NONCE_TTL_SECS: u64 = 300;
34
35/// Configuration for an IICP provider node.
36#[derive(Debug, Clone)]
37pub struct NodeConfig {
38    pub node_id: String,
39    pub endpoint: String,
40    pub intent: String,
41    pub model: Option<String>,
42    pub region: Option<String>,
43    pub capabilities: Vec<String>,
44    pub directory_url: String,
45    pub timeout_ms: u64,
46    /// Maximum concurrent tasks; excess requests receive 429 IICP-E021.
47    pub max_concurrent: usize,
48    /// Tokens-per-minute capacity declared to directory (`limits.tokens_per_min`).
49    pub tokens_per_min: u32,
50    /// Per-request token cap declared on the capability object (`capabilities[].max_tokens`).
51    pub max_tokens: u32,
52    /// Optional native IICP binary endpoint (spec/iicp-dir.md v0.7.0).
53    /// Scheme MUST be `iicp://` (plaintext) or `iicpsec://` (TLS).
54    /// Default IICP port is 9484 (ADR-040). When set, the directory persists it
55    /// and clients SHOULD prefer it over `endpoint` for task CALLs.
56    pub transport_endpoint: Option<String>,
57    /// #331 Phase A.1 / ADR-041 — NAT-traversal observability fields surfaced
58    /// to the directory in the register payload. Populated by
59    /// [`IicpNode::apply_nat_profile`] when an operator runs detect_nat at
60    /// startup, OR set manually if the operator already knows their topology.
61    ///
62    /// `transport_method` is one of `direct` / `upnp_mapped` / `stun_hole_punch`
63    /// / `turn_relay` / `external_tunnel` / `unknown`.
64    pub transport_method: Option<String>,
65    /// One of `full_cone` / `restricted_cone` / `port_restricted` / `symmetric`
66    /// / `unknown` (observability only).
67    pub nat_type: Option<String>,
68    /// Forward-compat slot for ADR-041 transport_candidates[] + relay_endpoint.
69    pub transport_metadata: Option<serde_json::Value>,
70    /// ADR-043 §9 — 8-category exposure_mode, computed by `qualify_service` and set
71    /// in `apply_nat_profile`. Surfaced to the directory `nodes.exposure_mode` column (#344).
72    pub exposure_mode: Option<String>,
73    /// S.12 §2.1 CIP policy block surfaced to the directory register payload.
74    /// When `None`, register() falls back to the module-level
75    /// [`crate::cip_policy::get_cip_policy`] — operators can configure once
76    /// and have it apply to all nodes that don't override.
77    pub cip_policy: Option<std::sync::Arc<crate::cip_policy::CooperativeInferencePolicy>>,
78    /// ADR-019 declarative pricing block. When `None`, the SDK does not
79    /// advertise pricing and the directory defaults to a 1.0 multiplier.
80    pub pricing: Option<crate::pricing::PricingConfig>,
81    /// Operator-provisioned HMAC key for ADR-019 pricing signatures. When
82    /// empty, the SDK captures the directory-issued key from the register
83    /// response and uses it for subsequent signing.
84    pub node_hmac_key: String,
85    /// Phase 3+ availability windows (ADR-006). Local-time "HH:MM" windows that
86    /// shape the effective capacity advertised to the directory and gated at
87    /// serve time. Empty → always full capacity. See [`crate::availability`].
88    pub availability_windows: Vec<crate::availability::Window>,
89    /// ADR-010 task_id idempotency. `false` by default to preserve the pre-0.6
90    /// contract (a task_id may be resubmitted). When `true`, a duplicate task_id
91    /// within the 5-minute window is rejected with IICP-E010.
92    pub enable_idempotency: bool,
93    /// Phase 2 mesh (ADR-009/022). When `true`, serve() gossips peers and exposes
94    /// POST /v1/peers. Default false.
95    pub enable_mesh: bool,
96    /// When `true`, serve() exposes POST /v1/relay to forward tasks to peers learned
97    /// via gossip (ADR-022). Requires `enable_mesh`. Default false.
98    pub relay_capable: bool,
99    /// Port for the RelayAcceptServer (R1 relay-as-last-resort, #341).
100    /// Workers behind CGNAT connect here outbound and send RELAY_BIND. Default 9485.
101    pub relay_accept_port: u16,
102    /// R2: when set, this node acts as a relay WORKER — connects outbound to the
103    /// specified relay endpoint. Format: "host:port" (e.g. "relay.example.com:9485").
104    pub relay_worker_endpoint: Option<String>,
105}
106
107impl NodeConfig {
108    pub fn new(
109        node_id: impl Into<String>,
110        endpoint: impl Into<String>,
111        intent: impl Into<String>,
112    ) -> Self {
113        Self {
114            node_id: node_id.into(),
115            endpoint: endpoint.into(),
116            intent: intent.into(),
117            model: None,
118            region: None,
119            capabilities: vec![],
120            directory_url: DEFAULT_DIRECTORY.into(),
121            timeout_ms: 5_000,
122            max_concurrent: 4,
123            tokens_per_min: 10_000,
124            max_tokens: 8_192,
125            transport_endpoint: None,
126            transport_method: None,
127            nat_type: None,
128            transport_metadata: None,
129            exposure_mode: None,
130            cip_policy: None,
131            pricing: None,
132            node_hmac_key: String::new(),
133            availability_windows: Vec::new(),
134            enable_idempotency: false,
135            enable_mesh: false,
136            relay_capable: false,
137            relay_accept_port: 9485,
138            relay_worker_endpoint: None,
139        }
140    }
141}
142
143#[derive(Debug, Deserialize)]
144pub struct TaskRequest {
145    pub task_id: String,
146    pub intent: String,
147    pub payload: Value,
148    pub constraints: Option<Value>,
149    pub auth: Option<Value>,
150    pub nonce: Option<String>,
151    /// Injected server-side from the W3C `traceparent` header — not from the JSON body.
152    #[serde(skip_deserializing)]
153    pub _trace: Option<Value>,
154}
155
156#[derive(Debug, Serialize)]
157pub struct TaskResponse {
158    pub task_id: String,
159    pub status: String,
160    #[serde(skip_serializing_if = "Option::is_none")]
161    pub result: Option<Value>,
162    #[serde(skip_serializing_if = "Option::is_none")]
163    pub error: Option<Value>,
164}
165
166pub type TaskHandlerFn = Arc<
167    dyn Fn(
168            TaskRequest,
169        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send>>
170        + Send
171        + Sync,
172>;
173
174struct AppState {
175    handler: TaskHandlerFn,
176    node_id: String,
177    region: String,
178    intent: String,
179    model: String,
180    active_jobs: Arc<AtomicUsize>,
181    max_concurrent: usize,
182    availability: Arc<crate::availability::AvailabilityEvaluator>,
183    idempotency: Arc<crate::idempotency::IdempotencyGuard>,
184    enable_idempotency: bool,
185    peer_manager: Arc<crate::peer_manager::PeerManager>,
186    http: reqwest::Client,
187    nonce_cache: Arc<Mutex<HashMap<String, Instant>>>,
188    /// #343 — shared pinhole state for /iicp/health surface.
189    pinhole_uid: Arc<std::sync::RwLock<Option<u32>>>,
190    pinhole_lease_seconds: Arc<std::sync::RwLock<u32>>,
191    /// R1 relay-as-last-resort (#341): sessions from workers binding outbound.
192    #[cfg(feature = "iicp-tcp")]
193    relay_sessions: Arc<crate::relay_session::RelaySessionRegistry>,
194}
195
196// ── GET /iicp/health ─────────────────────────────────────────────────────────
197
198async fn health_endpoint(State(state): State<Arc<AppState>>) -> impl IntoResponse {
199    let active = state.active_jobs.load(Ordering::Relaxed);
200    let uid = state.pinhole_uid.read().ok().and_then(|g| *g);
201    let lease = state
202        .pinhole_lease_seconds
203        .read()
204        .map(|g| *g)
205        .unwrap_or(3600);
206    let pinhole_state = if let Some(uid) = uid {
207        json!({ "active": true, "unique_id": uid, "lease_seconds": lease })
208    } else {
209        json!({ "active": false })
210    };
211    let eff_max = state
212        .availability
213        .effective_max_concurrent(state.max_concurrent);
214    Json(json!({
215        "status": "ok",
216        "node_id": state.node_id,
217        "region": state.region,
218        "load": (active as f64 / state.max_concurrent.max(1) as f64),
219        "active_jobs": active,
220        "max_concurrent": state.max_concurrent,
221        "effective_max_concurrent": eff_max,
222        "available": active < eff_max,
223        "model": state.model,
224        "intent": state.intent,
225        "pinhole_state": pinhole_state,
226    }))
227}
228
229// ── GET /metrics ─────────────────────────────────────────────────────────────
230
231async fn metrics_endpoint() -> Response {
232    #[cfg(feature = "metrics")]
233    {
234        use prometheus::{Encoder, TextEncoder};
235        let encoder = TextEncoder::new();
236        let mf = prometheus::gather();
237        let mut buf = Vec::new();
238        if encoder.encode(&mf, &mut buf).is_ok() {
239            return (
240                StatusCode::OK,
241                [(
242                    axum::http::header::CONTENT_TYPE,
243                    "text/plain; version=0.0.4",
244                )],
245                buf,
246            )
247                .into_response();
248        }
249    }
250    (
251        StatusCode::SERVICE_UNAVAILABLE,
252        "metrics feature not enabled",
253    )
254        .into_response()
255}
256
257// ── POST /v1/peers (ADR-009 gossip exchange) ──────────────────────────────────
258
259async fn peers_endpoint(
260    State(state): State<Arc<AppState>>,
261    headers: HeaderMap,
262    body: axum::body::Bytes,
263) -> Response {
264    let sig = headers
265        .get("x-iicp-signature")
266        .and_then(|v| v.to_str().ok());
267    if !state.peer_manager.verify_exchange(&body, sig) {
268        return (
269            StatusCode::UNAUTHORIZED,
270            Json(json!({"error":{"code":"IICP-E012","message":"invalid_signature"}})),
271        )
272            .into_response();
273    }
274    if let Ok(parsed) = serde_json::from_slice::<Value>(&body) {
275        if let Some(arr) = parsed.get("known_peers").and_then(Value::as_array) {
276            let dicts: Vec<Value> = arr.iter().filter(|p| p.is_object()).cloned().collect();
277            state.peer_manager.merge_peers(&dicts);
278        }
279    }
280    let peers: Vec<Value> = state
281        .peer_manager
282        .get_peers()
283        .iter()
284        .map(|p| {
285            json!({
286                "node_id": p.node_id,
287                "endpoint": p.endpoint,
288                "region": p.region,
289                "last_seen": p.last_seen,
290            })
291        })
292        .collect();
293    Json(json!({ "peers": peers })).into_response()
294}
295
296// ── POST /v1/relay (ADR-022 mesh relay) ───────────────────────────────────────
297
298async fn relay_endpoint(
299    State(state): State<Arc<AppState>>,
300    Json(payload): Json<Value>,
301) -> Response {
302    let target_id = payload
303        .get("target_node_id")
304        .and_then(Value::as_str)
305        .unwrap_or("");
306    let task = payload.get("task");
307    if target_id.is_empty() || task.is_none() {
308        return (
309            StatusCode::UNPROCESSABLE_ENTITY,
310            Json(
311                json!({"error":{"code":"IICP-E000","message":"target_node_id and task required"}}),
312            ),
313        )
314            .into_response();
315    }
316    let task_val = task.expect("checked above").clone();
317
318    // R1: check relay session registry first (CGNAT workers with no inbound endpoint)
319    #[cfg(feature = "iicp-tcp")]
320    if let Some(session) = state.relay_sessions.get(target_id) {
321        match session.forward_task(&task_val, 120).await {
322            Ok(result) => {
323                let task_id = task_val
324                    .get("task_id")
325                    .and_then(Value::as_str)
326                    .unwrap_or("");
327                return Json(json!({
328                    "task_id": task_id,
329                    "status": "completed",
330                    "result": result
331                }))
332                .into_response();
333            }
334            Err(e) => {
335                return (
336                    StatusCode::BAD_GATEWAY,
337                    Json(json!({"error":{"code":"IICP-E031","message":format!("relay session forward failed: {e}")}})),
338                )
339                    .into_response();
340            }
341        }
342    }
343
344    // Fall back to HTTP forwarding for routable peers (ADR-022)
345    let target = match state.peer_manager.relay_target(target_id) {
346        Some(t) => t,
347        None => {
348            return (
349                StatusCode::NOT_FOUND,
350                Json(json!({"error":{"code":"IICP-E030","message":"target not in peer list and not a bound relay worker"}})),
351            )
352                .into_response();
353        }
354    };
355    let url = format!("{}/v1/task", target.endpoint.trim_end_matches('/'));
356    match state
357        .http
358        .post(&url)
359        .timeout(Duration::from_secs(120))
360        .json(&task_val)
361        .send()
362        .await
363    {
364        Ok(resp) => {
365            let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::OK);
366            let bytes = resp.bytes().await.unwrap_or_default();
367            (status, bytes).into_response()
368        }
369        Err(e) => (
370            StatusCode::BAD_GATEWAY,
371            Json(json!({"error":{"code":"IICP-E031","message":format!("relay failed: {e}")}})),
372        )
373            .into_response(),
374    }
375}
376
377// ── POST /v1/task ─────────────────────────────────────────────────────────────
378
379/// Try to claim a concurrency slot. On `true` the caller owns one increment of
380/// `active_jobs` and MUST `fetch_sub` it on every exit path. realtime/interactive
381/// wait briefly for a slot; other tiers fail fast so the proxy sees back-pressure
382/// immediately (ADR-006; see [`crate::scheduler`]).
383async fn admit(state: &AppState, qos: &str) -> bool {
384    // Effective cap folds in availability windows (ADR-006): a reduced/closed
385    // window lowers capacity below max_concurrent.
386    let cap = state
387        .availability
388        .effective_max_concurrent(state.max_concurrent);
389    let prev = state.active_jobs.fetch_add(1, Ordering::Relaxed);
390    if prev < cap {
391        return true;
392    }
393    state.active_jobs.fetch_sub(1, Ordering::Relaxed);
394    if !crate::scheduler::is_queue_eligible(qos) {
395        return false;
396    }
397    let deadline = Instant::now() + crate::scheduler::QUEUE_WAIT;
398    while Instant::now() < deadline {
399        tokio::time::sleep(Duration::from_millis(50)).await;
400        let cap = state
401            .availability
402            .effective_max_concurrent(state.max_concurrent);
403        let prev = state.active_jobs.fetch_add(1, Ordering::Relaxed);
404        if prev < cap {
405            return true;
406        }
407        state.active_jobs.fetch_sub(1, Ordering::Relaxed);
408    }
409    false
410}
411
412async fn task_endpoint(
413    State(state): State<Arc<AppState>>,
414    headers: HeaderMap,
415    Json(mut req): Json<TaskRequest>,
416) -> Response {
417    // QoS-aware admission — IICP-E021
418    let qos = req
419        .constraints
420        .as_ref()
421        .and_then(|c| c.get("qos_class"))
422        .and_then(|v| v.as_str())
423        .unwrap_or("best_effort")
424        .to_string();
425    if !admit(&state, &qos).await {
426        return (
427            StatusCode::TOO_MANY_REQUESTS,
428            [("Retry-After", "2"), ("Content-Type", "application/json")],
429            Json(json!({
430                "error": {
431                    "code": "IICP-E021",
432                    "message": "capacity_exceeded",
433                    "qos_class": qos,
434                    "retry_after_ms": 2000,
435                }
436            })),
437        )
438            .into_response();
439    }
440
441    // Nonce replay protection — IICP-E011
442    if let Some(ref nonce) = req.nonce {
443        let mut cache = state.nonce_cache.lock().await;
444        cache.retain(|_, inserted_at| inserted_at.elapsed().as_secs() < NONCE_TTL_SECS);
445        if cache.contains_key(nonce) {
446            state.active_jobs.fetch_sub(1, Ordering::Relaxed);
447            return (
448                StatusCode::CONFLICT,
449                Json(json!({
450                    "error": { "code": "IICP-E011", "message": "replay_detected" }
451                })),
452            )
453                .into_response();
454        }
455        cache.insert(nonce.clone(), Instant::now());
456    }
457
458    // Idempotency — duplicate task_id within the retry window (ADR-010). Opt-in
459    // (NodeConfig.enable_idempotency) to preserve the pre-0.6 contract.
460    if state.enable_idempotency && !state.idempotency.check_and_register(&req.task_id) {
461        state.active_jobs.fetch_sub(1, Ordering::Relaxed);
462        return (
463            StatusCode::CONFLICT,
464            Json(json!({
465                "error": { "code": "IICP-E010", "message": "duplicate_task" }
466            })),
467        )
468            .into_response();
469    }
470
471    // W3C traceparent propagation
472    if let Some(tp) = headers.get("traceparent").and_then(|v| v.to_str().ok()) {
473        req._trace = Some(json!({ "traceparent": tp }));
474    }
475
476    let task_id = req.task_id.clone();
477    // ADR-014 TRACE-02 — iicp.task.execute span via `tracing` crate.
478    // `tracing-opentelemetry` bridge propagates this to an OTLP collector when
479    // OTEL_EXPORTER_OTLP_ENDPOINT is set and the operator configures the bridge
480    // at startup (e.g. via opentelemetry-otlp + tracing-opentelemetry).
481    let result = {
482        let span = tracing::info_span!(
483            "iicp.task.execute",
484            "iicp.task_id" = %task_id,
485            "iicp.intent" = %req.intent,
486        );
487        let _guard = span.enter();
488        (state.handler)(req).await
489    };
490    state.active_jobs.fetch_sub(1, Ordering::Relaxed);
491
492    match result {
493        Ok(value) => Json(TaskResponse {
494            task_id,
495            status: "completed".into(),
496            result: Some(value),
497            error: None,
498        })
499        .into_response(),
500        Err(e) => (
501            StatusCode::INTERNAL_SERVER_ERROR,
502            Json(TaskResponse {
503                task_id,
504                status: "error".into(),
505                result: None,
506                error: Some(json!({ "message": e.to_string() })),
507            }),
508        )
509            .into_response(),
510    }
511}
512
513// ── IicpNode ──────────────────────────────────────────────────────────────────
514
515/// IICP provider node — handles registration, heartbeats, and task serving.
516pub struct IicpNode {
517    cfg: NodeConfig,
518    http: Client,
519    /// ADR-019 HMAC key used for signing pricing declarations. Initialized
520    /// from `cfg.node_hmac_key`; populated from the directory's response on
521    /// first register() so subsequent re-registrations sign with the
522    /// directory-issued key.
523    runtime_hmac_key: std::sync::RwLock<String>,
524    /// BUG-5: token stashed by register() so deregister()/heartbeat don't need it re-passed.
525    runtime_token: std::sync::RwLock<String>,
526    /// #343 — UPnP IPv6 pinhole UID captured by `apply_nat_profile`, revoked
527    /// on shutdown via [`Self::revoke_pinhole`]. Only read under the `nat`
528    /// feature; allowed dead_code so non-nat builds compile cleanly.
529    #[allow(dead_code)]
530    pinhole_uid: std::sync::RwLock<Option<u32>>,
531    #[allow(dead_code)]
532    pinhole_lease_seconds: std::sync::RwLock<u32>,
533}
534
535impl IicpNode {
536    pub fn new(cfg: NodeConfig) -> Self {
537        let http = Client::builder()
538            .timeout(Duration::from_millis(cfg.timeout_ms + 2_000))
539            .use_rustls_tls()
540            .build()
541            .expect("failed to build HTTP client");
542        let runtime_hmac_key = std::sync::RwLock::new(cfg.node_hmac_key.clone());
543        Self {
544            cfg,
545            http,
546            runtime_hmac_key,
547            runtime_token: std::sync::RwLock::new(String::new()),
548            pinhole_uid: std::sync::RwLock::new(None),
549            pinhole_lease_seconds: std::sync::RwLock::new(3600),
550        }
551    }
552
553    /// Current HMAC key in use for ADR-019 pricing signatures (empty if
554    /// unregistered AND no operator-provisioned key).
555    pub fn node_hmac_key(&self) -> String {
556        self.runtime_hmac_key.read().expect("poisoned").clone()
557    }
558
559    /// Borrow this node's configuration. Useful for callers (e.g.
560    /// [`crate::conformance::run_conformance_checks`]) that need to inspect
561    /// `directory_url`, `endpoint`, or `node_id` without owning the config.
562    pub fn cfg(&self) -> &NodeConfig {
563        &self.cfg
564    }
565
566    /// Set the relay-worker endpoint after construction. Used by the CLI when a
567    /// relay is auto-elected post-NAT-detection (tier ≥ 3): `serve()` reads
568    /// `self.cfg.relay_worker_endpoint` to start the outbound relay session.
569    pub fn set_relay_worker_endpoint(&mut self, endpoint: String) {
570        self.cfg.relay_worker_endpoint = Some(endpoint);
571    }
572
573    /// Populate `endpoint`, `transport_endpoint`, and the NAT observability
574    /// fields from a `NatProfile` produced by [`crate::nat_detection::detect_nat`].
575    ///
576    /// Operators typically call this right after `detect_nat()` and before
577    /// `register()` so the directory receives the discovered public endpoint
578    /// + transport_method/nat_type/transport_metadata in the same payload.
579    ///
580    /// Defensive: tier-4 (unreachable) profiles do NOT overwrite a manually-
581    /// set endpoint, and `transport_method == "unreachable"` is filtered out
582    /// before register.
583    #[cfg(feature = "nat")]
584    pub fn apply_nat_profile(&mut self, profile: &crate::nat_detection::NatProfile) {
585        if profile.is_reachable() {
586            if let Some(pub_ep) = &profile.public_endpoint {
587                self.cfg.endpoint = pub_ep.clone();
588            }
589        }
590        if let Some(tep) = &profile.transport_endpoint {
591            self.cfg.transport_endpoint = Some(tep.clone());
592        }
593        let tm = match profile.transport_method {
594            crate::nat_detection::TransportMethod::Direct => Some("direct"),
595            crate::nat_detection::TransportMethod::UpnpMapped => Some("upnp_mapped"),
596            crate::nat_detection::TransportMethod::StunHolePunch => Some("stun_hole_punch"),
597            crate::nat_detection::TransportMethod::TurnRelay => Some("turn_relay"),
598            crate::nat_detection::TransportMethod::ExternalTunnel => Some("external_tunnel"),
599            crate::nat_detection::TransportMethod::Unreachable => None,
600        };
601        if let Some(name) = tm {
602            self.cfg.transport_method = Some(name.into());
603        }
604        if self.cfg.nat_type.is_none() {
605            self.cfg.nat_type = Some("unknown".into());
606        }
607        let tail: Vec<&str> = profile
608            .detection_log
609            .iter()
610            .rev()
611            .take(1)
612            .map(|s| s.as_str())
613            .collect();
614        self.cfg.transport_metadata = Some(serde_json::json!({
615            "tier": profile.tier,
616            "detection_log_tail": tail,
617        }));
618        // ADR-043 §9 (#344) — derive the canonical 8-category exposure_mode and
619        // advertise it so the directory can store nodes.exposure_mode for routing.
620        self.cfg.exposure_mode = Some(
621            crate::qualify::qualify_service(profile)
622                .exposure_mode
623                .to_string(),
624        );
625        // #343 — capture the IPv6 firewall pinhole UID and lease so we can renew and revoke.
626        if let Some(v6) = &profile.ipv6 {
627            if v6.pinhole_active {
628                if let Some(uid) = v6.pinhole_unique_id {
629                    if let Ok(mut slot) = self.pinhole_uid.write() {
630                        *slot = Some(uid);
631                    }
632                }
633                if let Some(lease) = v6.pinhole_lease_seconds {
634                    if let Ok(mut slot) = self.pinhole_lease_seconds.write() {
635                        *slot = lease;
636                    }
637                }
638            }
639        }
640    }
641
642    /// #343 — close the UPnP IPv6 firewall pinhole if one is tracked. Best-effort.
643    #[cfg(feature = "nat")]
644    pub async fn revoke_pinhole(&self) -> bool {
645        let uid = match self.pinhole_uid.write() {
646            Ok(mut slot) => slot.take(),
647            Err(_) => None,
648        };
649        match uid {
650            Some(uid) => crate::nat_detection::delete_ipv6_pinhole(uid).await,
651            None => false,
652        }
653    }
654
655    /// Tell the directory this node is going away.
656    ///
657    /// Mirrors `iicp_client.IicpNode.deregister` (Python iter-1471) and
658    /// `IicpNode.deregister` (TS iter-1474). Best-effort: shutdown paths
659    /// swallow failures so a flaky directory connection doesn't block exit.
660    /// Deregister from the directory. `node_token` defaults to the token stashed by
661    /// `register()` (BUG-5) when `None` — pass `Some(token)` to override.
662    pub async fn deregister(&self, node_token: Option<&str>) -> Result<()> {
663        let stashed = self.runtime_token.read().expect("poisoned").clone();
664        let token = node_token.map(str::to_string).unwrap_or(stashed);
665        if token.is_empty() {
666            return Err(crate::errors::IicpError::Node(
667                "deregister() requires a node_token (none stashed — call register() first)".into(),
668            ));
669        }
670        let url = format!(
671            "{}/v1/register",
672            self.cfg.directory_url.trim_end_matches('/')
673        );
674        let resp = self
675            .http
676            .delete(&url)
677            .bearer_auth(&token)
678            .json(&serde_json::json!({"node_id": self.cfg.node_id}))
679            .send()
680            .await?;
681        let status = resp.status();
682        if !status.is_success() && status.as_u16() != 404 {
683            return Err(crate::errors::IicpError::Node(format!(
684                "Deregister failed: {status}"
685            )));
686        }
687        Ok(())
688    }
689
690    /// Register with the directory and return the assigned `node_token`.
691    ///
692    /// Payload conforms to spec/iicp-dir.md §3.1 REGISTER plus the v0.7.0
693    /// dual-endpoint extension (`transport_endpoint`). Pre-iter-1413
694    /// builds sent a non-spec flat-`intent` shape that the production
695    /// directory rejects with 422; fixed here.
696    pub async fn register(&self) -> Result<String> {
697        // Build the spec-compliant capability object. Legacy
698        // `capabilities: Vec<String>` is folded into the models array.
699        let mut models: Vec<String> = match &self.cfg.model {
700            Some(m) => vec![m.clone()],
701            None => Vec::new(),
702        };
703        for cap in &self.cfg.capabilities {
704            if !models.contains(cap) {
705                models.push(cap.clone());
706            }
707        }
708        let region = self
709            .cfg
710            .region
711            .clone()
712            .unwrap_or_else(|| "eu-central".to_string());
713
714        let mut payload = json!({
715            "endpoint": self.cfg.endpoint,
716            "region": region,
717            "capabilities": [{
718                "intent": self.cfg.intent,
719                "models": models,
720                "max_tokens": self.cfg.max_tokens,
721            }],
722            "limits": {
723                "max_concurrent": self.cfg.max_concurrent,
724                "tokens_per_min": self.cfg.tokens_per_min,
725            },
726        });
727        if !self.cfg.node_id.is_empty() {
728            payload["node_id"] = json!(self.cfg.node_id);
729        }
730        // spec v0.7.0 — native IICP binary endpoint
731        if let Some(t) = &self.cfg.transport_endpoint {
732            payload["transport_endpoint"] = json!(t);
733        }
734        // #331 / ADR-041 — NAT-traversal observability (set manually or via
735        // apply_nat_profile after detect_nat)
736        if let Some(m) = &self.cfg.transport_method {
737            payload["transport_method"] = json!(m);
738        }
739        if let Some(n) = &self.cfg.nat_type {
740            payload["nat_type"] = json!(n);
741        }
742        if let Some(md) = &self.cfg.transport_metadata {
743            payload["transport_metadata"] = md.clone();
744        }
745        // ADR-043 §9 (#344) — 8-category network exposure classification
746        if let Some(e) = &self.cfg.exposure_mode {
747            payload["exposure_mode"] = json!(e);
748        }
749
750        // SDK self-identification — directory surfaces these on /v1/discover
751        // so dashboards can render a language badge. Free-form so future
752        // SDKs in other languages can self-tag without a directory change.
753        payload["sdk_language"] = json!("rust");
754        payload["sdk_version"] = json!(env!("CARGO_PKG_VERSION"));
755
756        // S.12 §2.1 CIP-D1 policy block. Use the per-config policy if set,
757        // otherwise fall back to the module-level cip_policy::get_cip_policy().
758        let policy_arc = self
759            .cfg
760            .cip_policy
761            .clone()
762            .unwrap_or_else(crate::cip_policy::get_cip_policy);
763        if let Some(block) = policy_arc.as_register_policy_block() {
764            payload["policy"] = block;
765        }
766
767        // ADR-019 — declarative pricing block. Operator opt-in.
768        if let Some(pricing) = &self.cfg.pricing {
769            let hmac_key = self.runtime_hmac_key.read().expect("poisoned").clone();
770            payload["pricing"] = crate::pricing::build_pricing_block(pricing, &hmac_key);
771        }
772        if !self.cfg.node_hmac_key.is_empty() {
773            payload["node_hmac_key"] = json!(self.cfg.node_hmac_key);
774        }
775
776        let resp = self
777            .http
778            .post(format!(
779                "{}/v1/register",
780                self.cfg.directory_url.trim_end_matches('/')
781            ))
782            .json(&payload)
783            .send()
784            .await
785            .map_err(|e| IicpError::Node(e.to_string()))?;
786
787        if !resp.status().is_success() {
788            return Err(IicpError::Node(format!(
789                "register failed: {}",
790                resp.status()
791            )));
792        }
793        let data: Value = resp
794            .json()
795            .await
796            .map_err(|e| IicpError::Node(e.to_string()))?;
797        let token = data["node_token"]
798            .as_str()
799            .or_else(|| data["token"].as_str())
800            .ok_or_else(|| IicpError::Node(format!("no node_token in response: {data}")))?;
801        // BUG-5: stash the token so deregister()/heartbeat don't need it re-passed.
802        *self.runtime_token.write().expect("poisoned") = token.to_string();
803        // ADR-019: capture directory-issued HMAC key for subsequent signing.
804        // Operator-provisioned key (cfg.node_hmac_key) wins — we only set the
805        // runtime key from the response when the operator hasn't set one.
806        if self.cfg.node_hmac_key.is_empty() {
807            if let Some(dir_key) = data["node_hmac_key"].as_str() {
808                if !dir_key.is_empty() {
809                    let mut guard = self.runtime_hmac_key.write().expect("poisoned");
810                    *guard = dir_key.to_string();
811                }
812            }
813        }
814        Ok(token.to_string())
815    }
816
817    /// Send a single heartbeat to the directory.
818    pub async fn heartbeat(&self, node_token: &str) -> Result<()> {
819        let resp = self
820            .http
821            // /v1/heartbeat — default directory_url already ends in /api;
822            // the prior /api/v1/heartbeat path doubled the prefix and 404'd,
823            // so last_seen never updated and nodes vanished from /v1/stats.
824            .post(format!(
825                "{}/v1/heartbeat",
826                self.cfg.directory_url.trim_end_matches('/')
827            ))
828            // NodeTokenAuth middleware requires Bearer auth; the body
829            // token is retained for back-compat with older directory builds.
830            .bearer_auth(node_token)
831            .json(&json!({
832                "node_id": self.cfg.node_id,
833                "node_token": node_token,
834                "status": "available",
835                // Live capacity after availability shaping (ADR-006).
836                "max_concurrent": crate::availability::AvailabilityEvaluator::new(
837                    self.cfg.availability_windows.clone(),
838                )
839                .effective_max_concurrent(self.cfg.max_concurrent),
840            }))
841            .send()
842            .await
843            .map_err(|e| IicpError::Node(e.to_string()))?;
844
845        if !resp.status().is_success() {
846            return Err(IicpError::Node(format!(
847                "heartbeat failed: {}",
848                resp.status()
849            )));
850        }
851        Ok(())
852    }
853
854    /// Start the task server (blocks until cancelled).
855    ///
856    /// Serves `POST /v1/task`, `GET /iicp/health`, `GET /metrics`.
857    /// Starts a background heartbeat loop when `node_token` is provided.
858    pub async fn serve<F, Fut>(
859        &self,
860        handler: F,
861        addr: &str,
862        node_token: Option<String>,
863    ) -> Result<()>
864    where
865        F: Fn(TaskRequest) -> Fut + Send + Sync + 'static,
866        Fut: std::future::Future<Output = Result<Value>> + Send + 'static,
867    {
868        let handler: TaskHandlerFn = Arc::new(move |req| Box::pin(handler(req)));
869        // Clone before handler is potentially moved into the relay worker closure (iicp-tcp only).
870        #[cfg(feature = "iicp-tcp")]
871        let handler_for_relay = Arc::clone(&handler);
872        // Extract bind host before `addr` is shadowed by SocketAddr (iicp-tcp only).
873        #[cfg(feature = "iicp-tcp")]
874        let bind_host: String = addr.split(':').next().unwrap_or("0.0.0.0").to_string();
875        let active_jobs = Arc::new(AtomicUsize::new(0));
876        let nonce_cache = Arc::new(Mutex::new(HashMap::new()));
877        // #343 — shared pinhole state: pass to AppState (health endpoint) and renewal task.
878        let shared_pinhole_uid: Arc<std::sync::RwLock<Option<u32>>> = Arc::new(
879            std::sync::RwLock::new(self.pinhole_uid.read().ok().and_then(|g| *g)),
880        );
881        let shared_pinhole_lease: Arc<std::sync::RwLock<u32>> = Arc::new(std::sync::RwLock::new(
882            self.pinhole_lease_seconds
883                .read()
884                .map(|g| *g)
885                .unwrap_or(3600),
886        ));
887
888        let state = Arc::new(AppState {
889            handler,
890            node_id: self.cfg.node_id.clone(),
891            region: self.cfg.region.clone().unwrap_or_else(|| "unknown".into()),
892            intent: self.cfg.intent.clone(),
893            model: self.cfg.model.clone().unwrap_or_default(),
894            active_jobs,
895            max_concurrent: self.cfg.max_concurrent,
896            availability: Arc::new(crate::availability::AvailabilityEvaluator::new(
897                self.cfg.availability_windows.clone(),
898            )),
899            idempotency: Arc::new(crate::idempotency::IdempotencyGuard::default()),
900            enable_idempotency: self.cfg.enable_idempotency,
901            peer_manager: Arc::new(crate::peer_manager::PeerManager::with_opts(
902                self.cfg.directory_url.clone(),
903                self.cfg.node_hmac_key.clone(),
904                crate::peer_manager::PeerManagerOpts {
905                    relay_capable: self.cfg.relay_capable,
906                    relay_accept_port: self.cfg.relay_accept_port,
907                },
908            )),
909            http: self.http.clone(),
910            nonce_cache,
911            pinhole_uid: Arc::clone(&shared_pinhole_uid),
912            pinhole_lease_seconds: Arc::clone(&shared_pinhole_lease),
913            #[cfg(feature = "iicp-tcp")]
914            relay_sessions: Arc::new(crate::relay_session::RelaySessionRegistry::new()),
915        });
916
917        // Capture the availability handle before `state` is moved into the router,
918        // so the heartbeat loop below can report effective capacity.
919        let hb_availability = Arc::clone(&state.availability);
920        // Phase 2 mesh: bootstrap + gossip when enabled (before `state` is moved).
921        if self.cfg.enable_mesh {
922            let pm = Arc::clone(&state.peer_manager);
923            let node_id = self.cfg.node_id.clone();
924            let own_endpoint = self.cfg.endpoint.clone();
925            tokio::spawn(async move {
926                pm.start(&node_id, &own_endpoint).await;
927                let interval = pm.gossip_interval();
928                loop {
929                    tokio::time::sleep(interval).await;
930                    pm.gossip_round().await;
931                }
932            });
933        }
934
935        let mut app = Router::new()
936            .route("/v1/task", post(task_endpoint))
937            .route("/iicp/health", get(health_endpoint))
938            .route("/metrics", get(metrics_endpoint));
939        if self.cfg.enable_mesh {
940            app = app.route("/v1/peers", post(peers_endpoint));
941        }
942        if self.cfg.relay_capable {
943            app = app.route("/v1/relay", post(relay_endpoint));
944        }
945        // R1: capture relay_sessions Arc before state is moved into the router.
946        #[cfg(feature = "iicp-tcp")]
947        let relay_sessions_arc = Arc::clone(&state.relay_sessions);
948        let app = app.with_state(state);
949
950        let addr: SocketAddr = addr
951            .parse()
952            .map_err(|e| IicpError::Node(format!("invalid addr: {e}")))?;
953        let listener = TcpListener::bind(addr)
954            .await
955            .map_err(|e| IicpError::Node(e.to_string()))?;
956
957        tracing::info!("IICP node {} listening on {}", self.cfg.node_id, addr);
958
959        if let Some(token) = node_token {
960            let node_id = self.cfg.node_id.clone();
961            let dir = self.cfg.directory_url.clone();
962            let http = self.http.clone();
963            let avail = Arc::clone(&hb_availability);
964            let max_c = self.cfg.max_concurrent;
965            tokio::spawn(async move {
966                loop {
967                    tokio::time::sleep(Duration::from_secs(HEARTBEAT_INTERVAL_SECS)).await;
968                    if let Err(e) = http
969                        // /v1/heartbeat — see heartbeat() above for the doubled-prefix
970                        // history. Same fix applied here in the background loop.
971                        .post(format!("{}/v1/heartbeat", dir.trim_end_matches('/')))
972                        .bearer_auth(&token)
973                        .json(&json!({
974                            "node_id": &node_id,
975                            "node_token": &token,
976                            "status": "available",
977                            // Live capacity after availability shaping (ADR-006).
978                            "max_concurrent": avail.effective_max_concurrent(max_c),
979                        }))
980                        .send()
981                        .await
982                    {
983                        tracing::warn!("heartbeat failed: {e}");
984                    }
985                }
986            });
987        }
988
989        // #343 — pinhole renewal task: extends the UPnP IPv6 firewall pinhole at lease/2.
990        #[cfg(feature = "nat")]
991        {
992            let uid_arc = Arc::clone(&shared_pinhole_uid);
993            let lease_arc = Arc::clone(&shared_pinhole_lease);
994            tokio::spawn(async move {
995                loop {
996                    let (_uid, lease) = {
997                        let u = uid_arc.read().ok().and_then(|g| *g);
998                        let l = lease_arc.read().map(|g| *g).unwrap_or(3600);
999                        (u, l)
1000                    };
1001                    let delay = Duration::from_secs(u64::from((lease / 2).max(60)));
1002                    tokio::time::sleep(delay).await;
1003                    let uid = match uid_arc.read().ok().and_then(|g| *g) {
1004                        Some(u) => u,
1005                        None => return,
1006                    };
1007                    let ok = crate::nat_detection::renew_ipv6_pinhole(uid, lease).await;
1008                    if ok {
1009                        tracing::debug!("UPnP IPv6 pinhole uid={uid} renewed (lease={lease}s)");
1010                    } else {
1011                        tracing::warn!("UPnP IPv6 pinhole uid={uid} renewal failed — will retry");
1012                    }
1013                }
1014            });
1015        }
1016
1017        // R1: start RelayAcceptServer when relay-capable (#341)
1018        #[cfg(feature = "iicp-tcp")]
1019        if self.cfg.relay_capable {
1020            let relay_reg = relay_sessions_arc;
1021            let relay_host_str = bind_host.clone();
1022            let relay_port = self.cfg.relay_accept_port;
1023            tokio::spawn(async move {
1024                let srv = Arc::new(crate::relay_session::RelayAcceptServer::new(
1025                    (*relay_reg).clone(),
1026                    relay_host_str,
1027                    relay_port,
1028                ));
1029                if let Err(e) = srv.serve().await {
1030                    tracing::warn!("Relay accept server error: {e}");
1031                }
1032            });
1033        }
1034
1035        // R2: start relay worker client if relay_worker_endpoint is configured (#341)
1036        #[cfg(feature = "iicp-tcp")]
1037        if let Some(ref ep) = self.cfg.relay_worker_endpoint {
1038            let ep = ep.clone();
1039            let node_id = self.cfg.node_id.clone();
1040            let intent = self.cfg.intent.clone();
1041            let models = self.cfg.model.clone().map(|m| vec![m]).unwrap_or_default();
1042            let handler_fn: crate::relay_worker_client::RelayHandlerFn =
1043                Arc::new(move |task: Value| {
1044                    let h = Arc::clone(&handler_for_relay);
1045                    Box::pin(async move {
1046                        let req = crate::node::TaskRequest {
1047                            task_id: task
1048                                .get("task_id")
1049                                .and_then(|v| v.as_str())
1050                                .unwrap_or("")
1051                                .to_string(),
1052                            intent: task
1053                                .get("intent")
1054                                .and_then(|v| v.as_str())
1055                                .unwrap_or("")
1056                                .to_string(),
1057                            payload: task.get("payload").cloned().unwrap_or(Value::Null),
1058                            constraints: task.get("constraints").cloned(),
1059                            auth: task.get("auth").cloned(),
1060                            nonce: None,
1061                            _trace: None,
1062                        };
1063                        h(req)
1064                            .await
1065                            .unwrap_or_else(|e| json!({"error": e.to_string()}))
1066                    })
1067                });
1068            let (rhost, rport) = {
1069                if let Some(pos) = ep.rfind(':') {
1070                    let port = ep[pos + 1..].parse::<u16>().unwrap_or(9485);
1071                    (ep[..pos].to_string(), port)
1072                } else {
1073                    (ep.clone(), 9485u16)
1074                }
1075            };
1076            // on_bind: re-register with the relay's public endpoint so the node
1077            // appears ACTIVE in directory + stats (#358).
1078            let http_client = self.http.clone();
1079            let dir_url = self.cfg.directory_url.clone();
1080            let on_bind_cb: crate::relay_worker_client::OnBindFn = Arc::new(
1081                move |rh: String, rp: u16, _wid: String| {
1082                    let http = http_client.clone();
1083                    let dir = dir_url.clone();
1084                    Box::pin(async move {
1085                        // A full re-register would require the IicpNode reference here,
1086                        // which isn't available. For v0.7.0 we log the bind event.
1087                        // The node operator should use the cli bin which has the full
1088                        // context to re-register. Full wiring tracked in #341 R2.
1089                        tracing::info!(
1090                            "Relay worker bound to relay {}:{} — update directory registration to use relay endpoint",
1091                            rh, rp,
1092                        );
1093                        let _ = (http, dir); // suppress unused warnings
1094                    })
1095                },
1096            );
1097            tokio::spawn(async move {
1098                let rwc = Arc::new(
1099                    crate::relay_worker_client::RelayWorkerClient::new(
1100                        node_id, intent, rhost, rport, handler_fn, models,
1101                    )
1102                    .with_on_bind(on_bind_cb),
1103                );
1104                rwc.run().await;
1105            });
1106        }
1107
1108        axum::serve(listener, app)
1109            .await
1110            .map_err(|e| IicpError::Node(e.to_string()))
1111    }
1112}