Skip to main content

iicp_client/
node.rs

1// SPDX-License-Identifier: Apache-2.0
2//! IICP provider node — registration, heartbeats, and task serving.
3//!
4//! Implements:
5//! - `GET  /iicp/health`   — liveness / capacity (always 200)
6//! - `GET  /metrics`       — Prometheus text (503 if `metrics` feature absent)
7//! - `POST /v1/task`       — task handler with concurrency gate (IICP-E021),
8//!   nonce replay protection (IICP-E011), and W3C traceparent propagation.
9
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use axum::{
17    extract::State,
18    http::{HeaderMap, StatusCode},
19    response::{IntoResponse, Response},
20    routing::{get, post},
21    Json, Router,
22};
23use reqwest::Client;
24use serde::{Deserialize, Serialize};
25use serde_json::{json, Value};
26use socket2::{Domain, Protocol, Socket, Type};
27use tokio::net::TcpListener;
28use tokio::sync::Mutex;
29
30use crate::errors::{IicpError, Result};
31
32const DEFAULT_DIRECTORY: &str = "https://iicp.network/api";
33const HEARTBEAT_INTERVAL_SECS: u64 = 30;
34const NONCE_TTL_SECS: u64 = 300;
35
36/// #404 — re-register: POST the register payload and return the fresh `node_token`.
37/// Extracted from the heartbeat loop's re-register arm so the self-heal behaviour
38/// is unit-testable (the 30s interval loop itself is not).
39async fn reregister(http: &Client, url: &str, payload: &serde_json::Value) -> Option<String> {
40    let resp = http.post(url).json(payload).send().await.ok()?;
41    if !resp.status().is_success() {
42        return None;
43    }
44    let data = resp.json::<serde_json::Value>().await.ok()?;
45    data["node_token"]
46        .as_str()
47        .or_else(|| data["token"].as_str())
48        .map(String::from)
49}
50
51/// #409 — classify a backend model name to the IICP intent it serves.
52/// Embedding models (name contains "embed") advertise the embedding intent;
53/// every other model advertises the node's configured/default intent (chat).
54/// Conservative by design: we only split out embeddings, which is the verified
55/// real case (e.g. an LM Studio backend serving a chat model + `*-embed-*`).
56fn intent_for_model(model: &str, default_intent: &str) -> String {
57    if model.to_lowercase().contains("embed") {
58        "urn:iicp:intent:llm:embedding:v1".to_string()
59    } else {
60        default_intent.to_string()
61    }
62}
63
64/// #408 / ADR-046 (B1/#414 — audio-in added) — input modalities a backend model
65/// accepts. Vision-language models (name contains `vl`/`vision`/`llava`) accept
66/// images; `omni` models accept image and audio; audio models (`audio`/`voxtral`)
67/// accept audio; everything else is text-only. Conservative name-pattern detection.
68/// Each is a modality of chat, not a separate intent (ADR-046). The directory + spec
69/// accept text/image/audio/video in `input_modalities` (v0.10.0).
70fn modalities_for_model(model: &str) -> Vec<&'static str> {
71    let m = model.to_lowercase();
72    let has_image = m.contains("-vl-")
73        || m.ends_with("-vl")
74        || m.contains("vision")
75        || m.contains("llava")
76        || m.contains("omni");
77    let has_audio = m.contains("audio") || m.contains("voxtral") || m.contains("omni");
78    let mut mods = vec!["text"];
79    if has_image {
80        mods.push("image");
81    }
82    if has_audio {
83        mods.push("audio");
84    }
85    mods
86}
87
88/// #409 + #408 — group detected backend models into one capability object per
89/// (intent, input_modalities), so a single node advertises every intent its
90/// backend can serve (chat + embedding) AND distinguishes text-only vs
91/// image-capable (vision) chat. The directory accepts a multi-element
92/// `capabilities` array; clients pick the per-(intent,modality) model from
93/// discover. Back-compatible: a single text chat model yields the same single
94/// `["text"]` capability as before. Order: first-seen group leads (configured
95/// model — typically chat/text — first).
96fn build_capabilities(models: &[String], default_intent: &str, max_tokens: u32) -> Vec<Value> {
97    if models.is_empty() {
98        return vec![json!({
99            "intent": default_intent, "models": [], "max_tokens": max_tokens,
100            "input_modalities": ["text"],
101        })];
102    }
103    // Group key = "intent\0modalities" to keep (intent, modality) groups distinct + ordered.
104    let mut order: Vec<String> = Vec::new();
105    let mut groups: HashMap<String, (String, Vec<&'static str>, Vec<String>)> = HashMap::new();
106    for m in models {
107        let intent = intent_for_model(m, default_intent);
108        let modalities = modalities_for_model(m);
109        let key = format!("{intent}\u{0}{}", modalities.join(","));
110        let entry = groups.entry(key.clone()).or_insert_with(|| {
111            order.push(key.clone());
112            (intent.clone(), modalities.clone(), Vec::new())
113        });
114        if !entry.2.contains(m) {
115            entry.2.push(m.clone());
116        }
117    }
118    order
119        .into_iter()
120        .map(|key| {
121            let (intent, modalities, models) = groups.remove(&key).expect("key from order");
122            json!({
123                "intent": intent,
124                "models": models,
125                "max_tokens": max_tokens,
126                "input_modalities": modalities,
127            })
128        })
129        .collect()
130}
131
132/// Configuration for an IICP provider node.
133#[derive(Debug, Clone)]
134pub struct NodeConfig {
135    pub node_id: String,
136    pub endpoint: String,
137    pub intent: String,
138    pub model: Option<String>,
139    /// Detected backend server flavor advertised at register (node-detail field):
140    /// `ollama` / `lmstudio` / `vllm` / `llamacpp` / `anthropic` / `custom`.
141    pub backend: Option<String>,
142    pub region: Option<String>,
143    pub capabilities: Vec<String>,
144    pub directory_url: String,
145    pub timeout_ms: u64,
146    /// Maximum concurrent tasks; excess requests receive 429 IICP-E021.
147    pub max_concurrent: usize,
148    /// Tokens-per-minute capacity declared to directory (`limits.tokens_per_min`).
149    pub tokens_per_min: u32,
150    /// Per-request token cap declared on the capability object (`capabilities[].max_tokens`).
151    pub max_tokens: u32,
152    /// Optional native IICP binary endpoint (spec/iicp-dir.md v0.7.0).
153    /// Scheme MUST be `iicp://` (plaintext) or `iicpsec://` (TLS).
154    /// Default IICP port is 9484 (ADR-040). When set, the directory persists it
155    /// and clients SHOULD prefer it over `endpoint` for task CALLs.
156    pub transport_endpoint: Option<String>,
157    /// #331 Phase A.1 / ADR-041 — NAT-traversal observability fields surfaced
158    /// to the directory in the register payload. Populated by
159    /// [`IicpNode::apply_nat_profile`] when an operator runs detect_nat at
160    /// startup, OR set manually if the operator already knows their topology.
161    ///
162    /// `transport_method` is one of `direct` / `upnp_mapped` / `stun_hole_punch`
163    /// / `turn_relay` / `external_tunnel` / `unknown`.
164    pub transport_method: Option<String>,
165    /// One of `full_cone` / `restricted_cone` / `port_restricted` / `symmetric`
166    /// / `unknown` (observability only).
167    pub nat_type: Option<String>,
168    /// Forward-compat slot for ADR-041 transport_candidates[] + relay_endpoint.
169    pub transport_metadata: Option<serde_json::Value>,
170    /// ADR-043 §9 — 8-category exposure_mode, computed by `qualify_service` and set
171    /// in `apply_nat_profile`. Surfaced to the directory `nodes.exposure_mode` column (#344).
172    pub exposure_mode: Option<String>,
173    /// S.12 §2.1 CIP policy block surfaced to the directory register payload.
174    /// When `None`, register() falls back to the module-level
175    /// [`crate::cip_policy::get_cip_policy`] — operators can configure once
176    /// and have it apply to all nodes that don't override.
177    pub cip_policy: Option<std::sync::Arc<crate::cip_policy::CooperativeInferencePolicy>>,
178    /// ADR-019 declarative pricing block. When `None`, the SDK does not
179    /// advertise pricing and the directory defaults to a 1.0 multiplier.
180    pub pricing: Option<crate::pricing::PricingConfig>,
181    /// Operator-provisioned HMAC key for ADR-019 pricing signatures. When
182    /// empty, the SDK captures the directory-issued key from the register
183    /// response and uses it for subsequent signing.
184    pub node_hmac_key: String,
185    /// Phase 3+ availability windows (ADR-006). Local-time "HH:MM" windows that
186    /// shape the effective capacity advertised to the directory and gated at
187    /// serve time. Empty → always full capacity. See [`crate::availability`].
188    pub availability_windows: Vec<crate::availability::Window>,
189    /// ADR-010 task_id idempotency. `false` by default to preserve the pre-0.6
190    /// contract (a task_id may be resubmitted). When `true`, a duplicate task_id
191    /// within the 5-minute window is rejected with IICP-E010.
192    pub enable_idempotency: bool,
193    /// Phase 2 mesh (ADR-009/022). When `true`, serve() gossips peers and exposes
194    /// POST /v1/peers. Default false.
195    pub enable_mesh: bool,
196    /// When `true`, serve() exposes POST /v1/relay to forward tasks to peers learned
197    /// via gossip (ADR-022). Requires `enable_mesh`. Default false.
198    pub relay_capable: bool,
199    /// Port for the RelayAcceptServer (R1 relay-as-last-resort, #341).
200    /// Workers behind CGNAT connect here outbound and send RELAY_BIND. Default 9485.
201    pub relay_accept_port: u16,
202    /// R2: when set, this node acts as a relay WORKER — connects outbound to the
203    /// specified relay endpoint. Format: "host:port" (e.g. "relay.example.com:9485").
204    pub relay_worker_endpoint: Option<String>,
205    /// Directory for persistent log files (`<node_id>.log` + `events.jsonl`).
206    /// `None` disables file logging (stderr only). Overridden by `IICP_LOG_DIR`.
207    pub log_dir: Option<std::path::PathBuf>,
208}
209
210impl NodeConfig {
211    pub fn new(
212        node_id: impl Into<String>,
213        endpoint: impl Into<String>,
214        intent: impl Into<String>,
215    ) -> Self {
216        Self {
217            node_id: node_id.into(),
218            endpoint: endpoint.into(),
219            intent: intent.into(),
220            model: None,
221            backend: None,
222            region: None,
223            capabilities: vec![],
224            directory_url: DEFAULT_DIRECTORY.into(),
225            timeout_ms: 5_000,
226            max_concurrent: 4,
227            tokens_per_min: 10_000,
228            max_tokens: 8_192,
229            transport_endpoint: None,
230            transport_method: None,
231            nat_type: None,
232            transport_metadata: None,
233            exposure_mode: None,
234            cip_policy: None,
235            pricing: None,
236            node_hmac_key: String::new(),
237            availability_windows: Vec::new(),
238            enable_idempotency: false,
239            enable_mesh: false,
240            relay_capable: false,
241            relay_accept_port: 9485,
242            relay_worker_endpoint: None,
243            log_dir: None,
244        }
245    }
246}
247
248#[derive(Debug, Deserialize)]
249pub struct TaskRequest {
250    pub task_id: String,
251    pub intent: String,
252    pub payload: Value,
253    pub constraints: Option<Value>,
254    pub auth: Option<Value>,
255    pub nonce: Option<String>,
256    /// Injected server-side from the W3C `traceparent` header — not from the JSON body.
257    #[serde(skip_deserializing)]
258    pub _trace: Option<Value>,
259}
260
261#[derive(Debug, Serialize)]
262pub struct TaskResponse {
263    pub task_id: String,
264    pub status: String,
265    #[serde(skip_serializing_if = "Option::is_none")]
266    pub result: Option<Value>,
267    #[serde(skip_serializing_if = "Option::is_none")]
268    pub error: Option<Value>,
269}
270
271pub type TaskHandlerFn = Arc<
272    dyn Fn(
273            TaskRequest,
274        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send>>
275        + Send
276        + Sync,
277>;
278
279struct AppState {
280    handler: TaskHandlerFn,
281    node_id: String,
282    region: String,
283    intent: String,
284    model: String,
285    active_jobs: Arc<AtomicUsize>,
286    /// Incremental task success/failure counters reset on each heartbeat.
287    tasks_success: Arc<AtomicUsize>,
288    tasks_failed: Arc<AtomicUsize>,
289    max_concurrent: usize,
290    availability: Arc<crate::availability::AvailabilityEvaluator>,
291    /// #403 — CIP per-task admission policy (tool-execution gate).
292    cip_policy: Arc<crate::cip_policy::CooperativeInferencePolicy>,
293    idempotency: Arc<crate::idempotency::IdempotencyGuard>,
294    enable_idempotency: bool,
295    peer_manager: Arc<crate::peer_manager::PeerManager>,
296    http: reqwest::Client,
297    nonce_cache: Arc<Mutex<HashMap<String, Instant>>>,
298    /// #343 — shared pinhole state for /iicp/health surface.
299    pinhole_uid: Arc<std::sync::RwLock<Option<u32>>>,
300    pinhole_lease_seconds: Arc<std::sync::RwLock<u32>>,
301    /// R1 relay-as-last-resort (#341): sessions from workers binding outbound.
302    #[cfg(feature = "iicp-tcp")]
303    relay_sessions: Arc<crate::relay_session::RelaySessionRegistry>,
304}
305
306// ── GET /iicp/health ─────────────────────────────────────────────────────────
307
308async fn health_endpoint(State(state): State<Arc<AppState>>) -> impl IntoResponse {
309    let active = state.active_jobs.load(Ordering::Relaxed);
310    let uid = state.pinhole_uid.read().ok().and_then(|g| *g);
311    let lease = state
312        .pinhole_lease_seconds
313        .read()
314        .map(|g| *g)
315        .unwrap_or(3600);
316    let pinhole_state = if let Some(uid) = uid {
317        json!({ "active": true, "unique_id": uid, "lease_seconds": lease })
318    } else {
319        json!({ "active": false })
320    };
321    let eff_max = state
322        .availability
323        .effective_max_concurrent(state.max_concurrent);
324    Json(json!({
325        "status": "ok",
326        "node_id": state.node_id,
327        "region": state.region,
328        "load": (active as f64 / state.max_concurrent.max(1) as f64),
329        "active_jobs": active,
330        "max_concurrent": state.max_concurrent,
331        "effective_max_concurrent": eff_max,
332        "available": active < eff_max,
333        "model": state.model,
334        "intent": state.intent,
335        "pinhole_state": pinhole_state,
336    }))
337}
338
339// ── GET /metrics ─────────────────────────────────────────────────────────────
340
341async fn metrics_endpoint() -> Response {
342    #[cfg(feature = "metrics")]
343    {
344        use prometheus::{Encoder, TextEncoder};
345        let encoder = TextEncoder::new();
346        let mf = prometheus::gather();
347        let mut buf = Vec::new();
348        if encoder.encode(&mf, &mut buf).is_ok() {
349            return (
350                StatusCode::OK,
351                [(
352                    axum::http::header::CONTENT_TYPE,
353                    "text/plain; version=0.0.4",
354                )],
355                buf,
356            )
357                .into_response();
358        }
359    }
360    (
361        StatusCode::SERVICE_UNAVAILABLE,
362        "metrics feature not enabled",
363    )
364        .into_response()
365}
366
367// ── POST /v1/peers (ADR-009 gossip exchange) ──────────────────────────────────
368
369async fn peers_endpoint(
370    State(state): State<Arc<AppState>>,
371    headers: HeaderMap,
372    body: axum::body::Bytes,
373) -> Response {
374    let sig = headers
375        .get("x-iicp-signature")
376        .and_then(|v| v.to_str().ok());
377    if !state.peer_manager.verify_exchange(&body, sig) {
378        return (
379            StatusCode::UNAUTHORIZED,
380            Json(json!({"error":{"code":"IICP-E012","message":"invalid_signature"}})),
381        )
382            .into_response();
383    }
384    if let Ok(parsed) = serde_json::from_slice::<Value>(&body) {
385        if let Some(arr) = parsed.get("known_peers").and_then(Value::as_array) {
386            let dicts: Vec<Value> = arr.iter().filter(|p| p.is_object()).cloned().collect();
387            state.peer_manager.merge_peers(&dicts);
388        }
389    }
390    let peers: Vec<Value> = state
391        .peer_manager
392        .get_peers()
393        .iter()
394        .map(|p| {
395            json!({
396                "node_id": p.node_id,
397                "endpoint": p.endpoint,
398                "region": p.region,
399                "last_seen": p.last_seen,
400            })
401        })
402        .collect();
403    Json(json!({ "peers": peers })).into_response()
404}
405
406// ── POST /v1/relay (ADR-022 mesh relay) ───────────────────────────────────────
407
408async fn relay_endpoint(
409    State(state): State<Arc<AppState>>,
410    Json(payload): Json<Value>,
411) -> Response {
412    let target_id = payload
413        .get("target_node_id")
414        .and_then(Value::as_str)
415        .unwrap_or("");
416    let task = payload.get("task");
417    if target_id.is_empty() || task.is_none() {
418        return (
419            StatusCode::UNPROCESSABLE_ENTITY,
420            Json(
421                json!({"error":{"code":"IICP-E000","message":"target_node_id and task required"}}),
422            ),
423        )
424            .into_response();
425    }
426    let task_val = task.expect("checked above").clone();
427
428    // R1: check relay session registry first (CGNAT workers with no inbound endpoint)
429    #[cfg(feature = "iicp-tcp")]
430    if let Some(session) = state.relay_sessions.get(target_id) {
431        match session.forward_task(&task_val, 120).await {
432            Ok(result) => {
433                let task_id = task_val
434                    .get("task_id")
435                    .and_then(Value::as_str)
436                    .unwrap_or("");
437                return Json(json!({
438                    "task_id": task_id,
439                    "status": "completed",
440                    "result": result
441                }))
442                .into_response();
443            }
444            Err(e) => {
445                return (
446                    StatusCode::BAD_GATEWAY,
447                    Json(json!({"error":{"code":"IICP-E031","message":format!("relay session forward failed: {e}")}})),
448                )
449                    .into_response();
450            }
451        }
452    }
453
454    // Fall back to HTTP forwarding for routable peers (ADR-022)
455    let target = match state.peer_manager.relay_target(target_id) {
456        Some(t) => t,
457        None => {
458            return (
459                StatusCode::NOT_FOUND,
460                Json(json!({"error":{"code":"IICP-E030","message":"target not in peer list and not a bound relay worker"}})),
461            )
462                .into_response();
463        }
464    };
465    let url = format!("{}/v1/task", target.endpoint.trim_end_matches('/'));
466    match state
467        .http
468        .post(&url)
469        .timeout(Duration::from_secs(120))
470        .json(&task_val)
471        .send()
472        .await
473    {
474        Ok(resp) => {
475            let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::OK);
476            let bytes = resp.bytes().await.unwrap_or_default();
477            (status, bytes).into_response()
478        }
479        Err(e) => (
480            StatusCode::BAD_GATEWAY,
481            Json(json!({"error":{"code":"IICP-E031","message":format!("relay failed: {e}")}})),
482        )
483            .into_response(),
484    }
485}
486
487// ── POST /v1/task ─────────────────────────────────────────────────────────────
488
489/// Try to claim a concurrency slot. On `true` the caller owns one increment of
490/// `active_jobs` and MUST `fetch_sub` it on every exit path. realtime/interactive
491/// wait briefly for a slot; other tiers fail fast so the proxy sees back-pressure
492/// immediately (ADR-006; see [`crate::scheduler`]).
493async fn admit(state: &AppState, qos: &str) -> bool {
494    // Effective cap folds in availability windows (ADR-006): a reduced/closed
495    // window lowers capacity below max_concurrent.
496    let cap = state
497        .availability
498        .effective_max_concurrent(state.max_concurrent);
499    let prev = state.active_jobs.fetch_add(1, Ordering::Relaxed);
500    if prev < cap {
501        return true;
502    }
503    state.active_jobs.fetch_sub(1, Ordering::Relaxed);
504    if !crate::scheduler::is_queue_eligible(qos) {
505        return false;
506    }
507    let deadline = Instant::now() + crate::scheduler::QUEUE_WAIT;
508    while Instant::now() < deadline {
509        tokio::time::sleep(Duration::from_millis(50)).await;
510        let cap = state
511            .availability
512            .effective_max_concurrent(state.max_concurrent);
513        let prev = state.active_jobs.fetch_add(1, Ordering::Relaxed);
514        if prev < cap {
515            return true;
516        }
517        state.active_jobs.fetch_sub(1, Ordering::Relaxed);
518    }
519    false
520}
521
522async fn task_endpoint(
523    State(state): State<Arc<AppState>>,
524    headers: HeaderMap,
525    Json(mut req): Json<TaskRequest>,
526) -> Response {
527    // #403 — CIP per-task admission gate (parity with the adapter cip_gate):
528    // reject tool-execution-domain intents unless the operator opted in via
529    // cip_policy.allow_tool_execution. Checked before the QoS slot so a denied
530    // task doesn't consume capacity.
531    if !state.cip_policy.permits_intent(&req.intent) {
532        return (
533            StatusCode::FORBIDDEN,
534            Json(json!({
535                "error": {
536                    "code": "tool_execution_denied",
537                    "message": "Tool-execution intents are not permitted by this node's CIP policy",
538                }
539            })),
540        )
541            .into_response();
542    }
543
544    // QoS-aware admission — IICP-E021
545    let qos = req
546        .constraints
547        .as_ref()
548        .and_then(|c| c.get("qos_class"))
549        .and_then(|v| v.as_str())
550        .unwrap_or("best_effort")
551        .to_string();
552    if !admit(&state, &qos).await {
553        return (
554            StatusCode::TOO_MANY_REQUESTS,
555            [("Retry-After", "2"), ("Content-Type", "application/json")],
556            Json(json!({
557                "error": {
558                    "code": "IICP-E021",
559                    "message": "capacity_exceeded",
560                    "qos_class": qos,
561                    "retry_after_ms": 2000,
562                }
563            })),
564        )
565            .into_response();
566    }
567
568    // Nonce replay protection — IICP-E011
569    if let Some(ref nonce) = req.nonce {
570        let mut cache = state.nonce_cache.lock().await;
571        cache.retain(|_, inserted_at| inserted_at.elapsed().as_secs() < NONCE_TTL_SECS);
572        if cache.contains_key(nonce) {
573            state.active_jobs.fetch_sub(1, Ordering::Relaxed);
574            return (
575                StatusCode::CONFLICT,
576                Json(json!({
577                    "error": { "code": "IICP-E011", "message": "replay_detected" }
578                })),
579            )
580                .into_response();
581        }
582        cache.insert(nonce.clone(), Instant::now());
583    }
584
585    // Idempotency — duplicate task_id within the retry window (ADR-010). Opt-in
586    // (NodeConfig.enable_idempotency) to preserve the pre-0.6 contract.
587    if state.enable_idempotency && !state.idempotency.check_and_register(&req.task_id) {
588        state.active_jobs.fetch_sub(1, Ordering::Relaxed);
589        return (
590            StatusCode::CONFLICT,
591            Json(json!({
592                "error": { "code": "IICP-E010", "message": "duplicate_task" }
593            })),
594        )
595            .into_response();
596    }
597
598    // W3C traceparent propagation
599    if let Some(tp) = headers.get("traceparent").and_then(|v| v.to_str().ok()) {
600        req._trace = Some(json!({ "traceparent": tp }));
601    }
602
603    let task_id = req.task_id.clone();
604    // ADR-014 TRACE-02 — iicp.task.execute span via `tracing` crate.
605    // `tracing-opentelemetry` bridge propagates this to an OTLP collector when
606    // OTEL_EXPORTER_OTLP_ENDPOINT is set and the operator configures the bridge
607    // at startup (e.g. via opentelemetry-otlp + tracing-opentelemetry).
608    let result = {
609        let span = tracing::info_span!(
610            "iicp.task.execute",
611            "iicp.task_id" = %task_id,
612            "iicp.intent" = %req.intent,
613        );
614        let _guard = span.enter();
615        (state.handler)(req).await
616    };
617    state.active_jobs.fetch_sub(1, Ordering::Relaxed);
618
619    match result {
620        Ok(value) => {
621            state.tasks_success.fetch_add(1, Ordering::Relaxed);
622            Json(TaskResponse {
623                task_id,
624                status: "completed".into(),
625                result: Some(value),
626                error: None,
627            })
628            .into_response()
629        }
630        Err(e) => {
631            state.tasks_failed.fetch_add(1, Ordering::Relaxed);
632            (
633                StatusCode::INTERNAL_SERVER_ERROR,
634                Json(TaskResponse {
635                    task_id,
636                    status: "error".into(),
637                    result: None,
638                    error: Some(json!({ "message": e.to_string() })),
639                }),
640            )
641                .into_response()
642        }
643    }
644}
645
646// ── IicpNode ──────────────────────────────────────────────────────────────────
647
648/// IICP provider node — handles registration, heartbeats, and task serving.
649pub struct IicpNode {
650    cfg: NodeConfig,
651    http: Client,
652    /// ADR-019 HMAC key used for signing pricing declarations. Initialized
653    /// from `cfg.node_hmac_key`; populated from the directory's response on
654    /// first register() so subsequent re-registrations sign with the
655    /// directory-issued key.
656    runtime_hmac_key: std::sync::RwLock<String>,
657    /// BUG-5: token stashed by register() so deregister()/heartbeat don't need it re-passed.
658    /// Arc so the background heartbeat task can update it after a re-registration (#399).
659    runtime_token: Arc<std::sync::RwLock<String>>,
660    /// #343 — UPnP IPv6 pinhole UID captured by `apply_nat_profile`, revoked
661    /// on shutdown via [`Self::revoke_pinhole`]. Only read under the `nat`
662    /// feature; allowed dead_code so non-nat builds compile cleanly.
663    #[allow(dead_code)]
664    pinhole_uid: std::sync::RwLock<Option<u32>>,
665    #[allow(dead_code)]
666    pinhole_lease_seconds: std::sync::RwLock<u32>,
667    /// ADR-047 Part A (#411) — latest liveness nonce from the heartbeat response,
668    /// answered (HMAC) on the next beat. None until the first response.
669    liveness_challenge: std::sync::RwLock<Option<String>>,
670}
671
672impl IicpNode {
673    pub fn new(cfg: NodeConfig) -> Self {
674        let http = Client::builder()
675            .timeout(Duration::from_millis(cfg.timeout_ms + 2_000))
676            .use_rustls_tls()
677            .build()
678            .expect("failed to build HTTP client");
679        let runtime_hmac_key = std::sync::RwLock::new(cfg.node_hmac_key.clone());
680        Self {
681            cfg,
682            http,
683            runtime_hmac_key,
684            runtime_token: Arc::new(std::sync::RwLock::new(String::new())),
685            pinhole_uid: std::sync::RwLock::new(None),
686            pinhole_lease_seconds: std::sync::RwLock::new(3600),
687            liveness_challenge: std::sync::RwLock::new(None),
688        }
689    }
690
691    /// Current HMAC key in use for ADR-019 pricing signatures (empty if
692    /// unregistered AND no operator-provisioned key).
693    pub fn node_hmac_key(&self) -> String {
694        self.runtime_hmac_key.read().expect("poisoned").clone()
695    }
696
697    /// Borrow this node's configuration. Useful for callers (e.g.
698    /// [`crate::conformance::run_conformance_checks`]) that need to inspect
699    /// `directory_url`, `endpoint`, or `node_id` without owning the config.
700    pub fn cfg(&self) -> &NodeConfig {
701        &self.cfg
702    }
703
704    /// Set the relay-worker endpoint after construction. Used by the CLI when a
705    /// relay is auto-elected post-NAT-detection (tier ≥ 3): `serve()` reads
706    /// `self.cfg.relay_worker_endpoint` to start the outbound relay session.
707    pub fn set_relay_worker_endpoint(&mut self, endpoint: String) {
708        self.cfg.relay_worker_endpoint = Some(endpoint);
709    }
710
711    /// Populate `endpoint`, `transport_endpoint`, and the NAT observability
712    /// fields from a `NatProfile` produced by [`crate::nat_detection::detect_nat`].
713    ///
714    /// Operators typically call this right after `detect_nat()` and before
715    /// `register()` so the directory receives the discovered public endpoint
716    /// + transport_method/nat_type/transport_metadata in the same payload.
717    ///
718    /// Defensive: tier-4 (unreachable) profiles do NOT overwrite a manually-
719    /// set endpoint, and `transport_method == "unreachable"` is filtered out
720    /// before register.
721    #[cfg(feature = "nat")]
722    pub fn apply_nat_profile(&mut self, profile: &crate::nat_detection::NatProfile) {
723        if profile.is_reachable() {
724            if let Some(pub_ep) = &profile.public_endpoint {
725                self.cfg.endpoint = pub_ep.clone();
726            }
727        }
728        if let Some(tep) = &profile.transport_endpoint {
729            self.cfg.transport_endpoint = Some(tep.clone());
730        }
731        let tm = match profile.transport_method {
732            crate::nat_detection::TransportMethod::Direct => Some("direct"),
733            crate::nat_detection::TransportMethod::UpnpMapped => Some("upnp_mapped"),
734            crate::nat_detection::TransportMethod::StunHolePunch => Some("stun_hole_punch"),
735            crate::nat_detection::TransportMethod::TurnRelay => Some("turn_relay"),
736            crate::nat_detection::TransportMethod::ExternalTunnel => Some("external_tunnel"),
737            crate::nat_detection::TransportMethod::Unreachable => None,
738        };
739        if let Some(name) = tm {
740            self.cfg.transport_method = Some(name.into());
741        }
742        if self.cfg.nat_type.is_none() {
743            self.cfg.nat_type = Some("unknown".into());
744        }
745        let tail: Vec<&str> = profile
746            .detection_log
747            .iter()
748            .rev()
749            .take(1)
750            .map(|s| s.as_str())
751            .collect();
752        self.cfg.transport_metadata = Some(serde_json::json!({
753            "tier": profile.tier,
754            "detection_log_tail": tail,
755        }));
756        // ADR-043 §9 (#344) — derive the canonical 8-category exposure_mode and
757        // advertise it so the directory can store nodes.exposure_mode for routing.
758        self.cfg.exposure_mode = Some(
759            crate::qualify::qualify_service(profile)
760                .exposure_mode
761                .to_string(),
762        );
763        // #343 — capture the IPv6 firewall pinhole UID and lease so we can renew and revoke.
764        if let Some(v6) = &profile.ipv6 {
765            if v6.pinhole_active {
766                if let Some(uid) = v6.pinhole_unique_id {
767                    if let Ok(mut slot) = self.pinhole_uid.write() {
768                        *slot = Some(uid);
769                    }
770                }
771                if let Some(lease) = v6.pinhole_lease_seconds {
772                    if let Ok(mut slot) = self.pinhole_lease_seconds.write() {
773                        *slot = lease;
774                    }
775                }
776            }
777        }
778    }
779
780    /// #343 — close the UPnP IPv6 firewall pinhole if one is tracked. Best-effort.
781    #[cfg(feature = "nat")]
782    pub async fn revoke_pinhole(&self) -> bool {
783        let uid = match self.pinhole_uid.write() {
784            Ok(mut slot) => slot.take(),
785            Err(_) => None,
786        };
787        match uid {
788            Some(uid) => crate::nat_detection::delete_ipv6_pinhole(uid).await,
789            None => false,
790        }
791    }
792
793    /// Tell the directory this node is going away.
794    ///
795    /// Mirrors `iicp_client.IicpNode.deregister` (Python iter-1471) and
796    /// `IicpNode.deregister` (TS iter-1474). Best-effort: shutdown paths
797    /// swallow failures so a flaky directory connection doesn't block exit.
798    /// Deregister from the directory. `node_token` defaults to the token stashed by
799    /// `register()` (BUG-5) when `None` — pass `Some(token)` to override.
800    pub async fn deregister(&self, node_token: Option<&str>) -> Result<()> {
801        let stashed = self.runtime_token.read().expect("poisoned").clone();
802        let token = node_token.map(str::to_string).unwrap_or(stashed);
803        if token.is_empty() {
804            return Err(crate::errors::IicpError::Node(
805                "deregister() requires a node_token (none stashed — call register() first)".into(),
806            ));
807        }
808        let url = format!(
809            "{}/v1/register",
810            self.cfg.directory_url.trim_end_matches('/')
811        );
812        let resp = self
813            .http
814            .delete(&url)
815            .bearer_auth(&token)
816            .json(&serde_json::json!({"node_id": self.cfg.node_id}))
817            .send()
818            .await?;
819        let status = resp.status();
820        if !status.is_success() && status.as_u16() != 404 {
821            return Err(crate::errors::IicpError::Node(format!(
822                "Deregister failed: {status}"
823            )));
824        }
825        Ok(())
826    }
827
828    /// Register with the directory and return the assigned `node_token`.
829    ///
830    /// Payload conforms to spec/iicp-dir.md §3.1 REGISTER plus the v0.7.0
831    /// dual-endpoint extension (`transport_endpoint`). Pre-iter-1413
832    /// builds sent a non-spec flat-`intent` shape that the production
833    /// directory rejects with 422; fixed here.
834    /// Build the spec-compliant REGISTER payload (iicp-dir §3.1 + v0.7.0
835    /// dual-endpoint). Extracted so the background heartbeat task can re-POST
836    /// the same payload to recover after the directory drops the node (#399).
837    fn build_register_payload(&self) -> Value {
838        // Build the spec-compliant capability object. Legacy
839        // `capabilities: Vec<String>` is folded into the models array.
840        let mut models: Vec<String> = match &self.cfg.model {
841            Some(m) => vec![m.clone()],
842            None => Vec::new(),
843        };
844        for cap in &self.cfg.capabilities {
845            if !models.contains(cap) {
846                models.push(cap.clone());
847            }
848        }
849        let region = self
850            .cfg
851            .region
852            .clone()
853            .unwrap_or_else(|| "eu-central".to_string());
854
855        let mut payload = json!({
856            "endpoint": self.cfg.endpoint,
857            "region": region,
858            // #409 — advertise one capability object per intent the backend can
859            // serve (e.g. chat + embedding from one Ollama/LM Studio backend),
860            // classified from the detected model set, instead of a single intent.
861            "capabilities": build_capabilities(&models, &self.cfg.intent, self.cfg.max_tokens),
862            "limits": {
863                "max_concurrent": self.cfg.max_concurrent,
864                "tokens_per_min": self.cfg.tokens_per_min,
865            },
866        });
867        if !self.cfg.node_id.is_empty() {
868            payload["node_id"] = json!(self.cfg.node_id);
869        }
870        if let Some(t) = &self.cfg.transport_endpoint {
871            payload["transport_endpoint"] = json!(t);
872        }
873        if let Some(m) = &self.cfg.transport_method {
874            payload["transport_method"] = json!(m);
875        }
876        if let Some(n) = &self.cfg.nat_type {
877            payload["nat_type"] = json!(n);
878        }
879        if let Some(md) = &self.cfg.transport_metadata {
880            payload["transport_metadata"] = md.clone();
881        }
882        if let Some(e) = &self.cfg.exposure_mode {
883            payload["exposure_mode"] = json!(e);
884        }
885        payload["sdk_language"] = json!("rust");
886        payload["sdk_version"] = json!(env!("CARGO_PKG_VERSION"));
887        if let Some(b) = &self.cfg.backend {
888            payload["backend"] = json!(b);
889        }
890        let policy_arc = self
891            .cfg
892            .cip_policy
893            .clone()
894            .unwrap_or_else(crate::cip_policy::get_cip_policy);
895        if let Some(block) = policy_arc.as_register_policy_block() {
896            payload["policy"] = block;
897        }
898        if let Some(pricing) = &self.cfg.pricing {
899            let hmac_key = self.runtime_hmac_key.read().expect("poisoned").clone();
900            payload["pricing"] = crate::pricing::build_pricing_block(pricing, &hmac_key);
901        }
902        if !self.cfg.node_hmac_key.is_empty() {
903            payload["node_hmac_key"] = json!(self.cfg.node_hmac_key);
904        }
905        payload
906    }
907
908    pub async fn register(&self) -> Result<String> {
909        let payload = self.build_register_payload();
910
911        let resp = self
912            .http
913            .post(format!(
914                "{}/v1/register",
915                self.cfg.directory_url.trim_end_matches('/')
916            ))
917            .json(&payload)
918            .send()
919            .await
920            .map_err(|e| IicpError::Node(e.to_string()))?;
921
922        if !resp.status().is_success() {
923            return Err(IicpError::Node(format!(
924                "register failed: {}",
925                resp.status()
926            )));
927        }
928        let data: Value = resp
929            .json()
930            .await
931            .map_err(|e| IicpError::Node(e.to_string()))?;
932        let token = data["node_token"]
933            .as_str()
934            .or_else(|| data["token"].as_str())
935            .ok_or_else(|| IicpError::Node(format!("no node_token in response: {data}")))?;
936        // BUG-5: stash the token so deregister()/heartbeat don't need it re-passed.
937        *self.runtime_token.write().expect("poisoned") = token.to_string();
938        // ADR-019: capture directory-issued HMAC key for subsequent signing.
939        // Operator-provisioned key (cfg.node_hmac_key) wins — we only set the
940        // runtime key from the response when the operator hasn't set one.
941        if self.cfg.node_hmac_key.is_empty() {
942            if let Some(dir_key) = data["node_hmac_key"].as_str() {
943                if !dir_key.is_empty() {
944                    let mut guard = self.runtime_hmac_key.write().expect("poisoned");
945                    *guard = dir_key.to_string();
946                }
947            }
948        }
949        Ok(token.to_string())
950    }
951
952    /// Send a single heartbeat to the directory.
953    pub async fn heartbeat(&self, node_token: &str) -> Result<()> {
954        let mut body = json!({
955            "node_id": self.cfg.node_id,
956            "node_token": node_token,
957            "status": "available",
958            // Live capacity after availability shaping (ADR-006).
959            "max_concurrent": crate::availability::AvailabilityEvaluator::new(
960                self.cfg.availability_windows.clone(),
961            )
962            .effective_max_concurrent(self.cfg.max_concurrent),
963        });
964        // ADR-047 Part A (#411) — answer the directory's liveness challenge from the
965        // previous beat: HMAC the nonce with node_hmac_key (proves key control with
966        // no dial-back; works for CGNAT/IPv6). No-op until both nonce + key exist.
967        let hmac_key = self.node_hmac_key();
968        let stored = self.liveness_challenge.read().expect("poisoned").clone();
969        if let Some(ch) = &stored {
970            if !hmac_key.is_empty() {
971                body["challenge_response"] =
972                    json!(crate::pricing::sign_body(ch.as_bytes(), &hmac_key));
973            }
974        }
975
976        let resp = self
977            .http
978            // /v1/heartbeat — default directory_url already ends in /api;
979            // the prior /api/v1/heartbeat path doubled the prefix and 404'd,
980            // so last_seen never updated and nodes vanished from /v1/stats.
981            .post(format!(
982                "{}/v1/heartbeat",
983                self.cfg.directory_url.trim_end_matches('/')
984            ))
985            // NodeTokenAuth middleware requires Bearer auth; the body
986            // token is retained for back-compat with older directory builds.
987            .bearer_auth(node_token)
988            .json(&body)
989            .send()
990            .await
991            .map_err(|e| IicpError::Node(e.to_string()))?;
992
993        if !resp.status().is_success() {
994            return Err(IicpError::Node(format!(
995                "heartbeat failed: {}",
996                resp.status()
997            )));
998        }
999        // Capture the fresh nonce to answer on the next beat (ADR-047 Part A).
1000        if let Ok(data) = resp.json::<Value>().await {
1001            if let Some(ch) = data["challenge"].as_str() {
1002                *self.liveness_challenge.write().expect("poisoned") = Some(ch.to_string());
1003            }
1004        }
1005        Ok(())
1006    }
1007
1008    /// Start the task server (blocks until cancelled).
1009    ///
1010    /// Serves `POST /v1/task`, `GET /iicp/health`, `GET /metrics`.
1011    /// Starts a background heartbeat loop when `node_token` is provided.
1012    pub async fn serve<F, Fut>(
1013        &self,
1014        handler: F,
1015        addr: &str,
1016        node_token: Option<String>,
1017    ) -> Result<()>
1018    where
1019        F: Fn(TaskRequest) -> Fut + Send + Sync + 'static,
1020        Fut: std::future::Future<Output = Result<Value>> + Send + 'static,
1021    {
1022        let handler: TaskHandlerFn = Arc::new(move |req| Box::pin(handler(req)));
1023        // Clone before handler is potentially moved into the relay worker closure (iicp-tcp only).
1024        #[cfg(feature = "iicp-tcp")]
1025        let handler_for_relay = Arc::clone(&handler);
1026        // Extract bind host before `addr` is shadowed by SocketAddr (iicp-tcp only).
1027        #[cfg(feature = "iicp-tcp")]
1028        let bind_host: String = addr.split(':').next().unwrap_or("0.0.0.0").to_string();
1029        let active_jobs = Arc::new(AtomicUsize::new(0));
1030        let nonce_cache = Arc::new(Mutex::new(HashMap::new()));
1031        // #343 — shared pinhole state: pass to AppState (health endpoint) and renewal task.
1032        let shared_pinhole_uid: Arc<std::sync::RwLock<Option<u32>>> = Arc::new(
1033            std::sync::RwLock::new(self.pinhole_uid.read().ok().and_then(|g| *g)),
1034        );
1035        let shared_pinhole_lease: Arc<std::sync::RwLock<u32>> = Arc::new(std::sync::RwLock::new(
1036            self.pinhole_lease_seconds
1037                .read()
1038                .map(|g| *g)
1039                .unwrap_or(3600),
1040        ));
1041
1042        let tasks_success = Arc::new(AtomicUsize::new(0));
1043        let tasks_failed = Arc::new(AtomicUsize::new(0));
1044        let state = Arc::new(AppState {
1045            handler,
1046            node_id: self.cfg.node_id.clone(),
1047            region: self.cfg.region.clone().unwrap_or_else(|| "unknown".into()),
1048            intent: self.cfg.intent.clone(),
1049            model: self.cfg.model.clone().unwrap_or_default(),
1050            active_jobs,
1051            tasks_success: Arc::clone(&tasks_success),
1052            tasks_failed: Arc::clone(&tasks_failed),
1053            max_concurrent: self.cfg.max_concurrent,
1054            availability: Arc::new(crate::availability::AvailabilityEvaluator::new(
1055                self.cfg.availability_windows.clone(),
1056            )),
1057            // #403 — resolve the CIP admission policy (cfg override or module default).
1058            cip_policy: self
1059                .cfg
1060                .cip_policy
1061                .clone()
1062                .unwrap_or_else(crate::cip_policy::get_cip_policy),
1063            idempotency: Arc::new(crate::idempotency::IdempotencyGuard::default()),
1064            enable_idempotency: self.cfg.enable_idempotency,
1065            peer_manager: Arc::new(crate::peer_manager::PeerManager::with_opts(
1066                self.cfg.directory_url.clone(),
1067                self.cfg.node_hmac_key.clone(),
1068                crate::peer_manager::PeerManagerOpts {
1069                    relay_capable: self.cfg.relay_capable,
1070                    relay_accept_port: self.cfg.relay_accept_port,
1071                },
1072            )),
1073            http: self.http.clone(),
1074            nonce_cache,
1075            pinhole_uid: Arc::clone(&shared_pinhole_uid),
1076            pinhole_lease_seconds: Arc::clone(&shared_pinhole_lease),
1077            #[cfg(feature = "iicp-tcp")]
1078            relay_sessions: Arc::new(crate::relay_session::RelaySessionRegistry::new()),
1079        });
1080
1081        // Capture the availability handle before `state` is moved into the router,
1082        // so the heartbeat loop below can report effective capacity.
1083        let hb_availability = Arc::clone(&state.availability);
1084        // Phase 2 mesh: bootstrap + gossip when enabled (before `state` is moved).
1085        if self.cfg.enable_mesh {
1086            let pm = Arc::clone(&state.peer_manager);
1087            let node_id = self.cfg.node_id.clone();
1088            let own_endpoint = self.cfg.endpoint.clone();
1089            tokio::spawn(async move {
1090                pm.start(&node_id, &own_endpoint).await;
1091                let interval = pm.gossip_interval();
1092                loop {
1093                    tokio::time::sleep(interval).await;
1094                    pm.gossip_round().await;
1095                }
1096            });
1097        }
1098
1099        let mut app = Router::new()
1100            .route("/v1/task", post(task_endpoint))
1101            .route("/iicp/health", get(health_endpoint))
1102            .route("/metrics", get(metrics_endpoint));
1103        if self.cfg.enable_mesh {
1104            app = app.route("/v1/peers", post(peers_endpoint));
1105        }
1106        if self.cfg.relay_capable {
1107            app = app.route("/v1/relay", post(relay_endpoint));
1108        }
1109        // R1: capture relay_sessions Arc before state is moved into the router.
1110        #[cfg(feature = "iicp-tcp")]
1111        let relay_sessions_arc = Arc::clone(&state.relay_sessions);
1112        let app = app.with_state(state);
1113
1114        let addr: SocketAddr = addr
1115            .parse()
1116            .map_err(|e| IicpError::Node(format!("invalid addr: {e}")))?;
1117
1118        // For IPv6 addresses (including the default :: host), create a dual-stack socket
1119        // so the same listener accepts both IPv4 and IPv6 connections. Linux defaults to
1120        // IPV6_V6ONLY=1 which would silently reject IPv4; setting it to false here gives
1121        // macOS-equivalent behaviour on all platforms.
1122        let listener = if addr.is_ipv6() {
1123            let socket = Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP))
1124                .map_err(|e| IicpError::Node(format!("socket create: {e}")))?;
1125            socket
1126                .set_only_v6(false)
1127                .map_err(|e| IicpError::Node(format!("set_only_v6: {e}")))?;
1128            socket
1129                .set_reuse_address(true)
1130                .map_err(|e| IicpError::Node(format!("set_reuse_address: {e}")))?;
1131            socket
1132                .bind(&addr.into())
1133                .map_err(|e| IicpError::Node(format!("bind {addr}: {e}")))?;
1134            socket
1135                .listen(1024)
1136                .map_err(|e| IicpError::Node(format!("listen: {e}")))?;
1137            let std_listener: std::net::TcpListener = socket.into();
1138            std_listener
1139                .set_nonblocking(true)
1140                .map_err(|e| IicpError::Node(e.to_string()))?;
1141            TcpListener::from_std(std_listener).map_err(|e| IicpError::Node(e.to_string()))?
1142        } else {
1143            TcpListener::bind(addr)
1144                .await
1145                .map_err(|e| IicpError::Node(e.to_string()))?
1146        };
1147
1148        tracing::info!("IICP node {} listening on {}", self.cfg.node_id, addr);
1149
1150        if let Some(token) = node_token {
1151            let node_id = self.cfg.node_id.clone();
1152            let dir = self.cfg.directory_url.clone();
1153            let http = self.http.clone();
1154            let avail = Arc::clone(&hb_availability);
1155            let max_c = self.cfg.max_concurrent;
1156            // Optional file logger shared with the heartbeat background task.
1157            let hb_log: Option<Arc<crate::node_log::NodeLog>> =
1158                self.cfg.log_dir.as_deref().and_then(|d| {
1159                    crate::node_log::NodeLog::open(d, &node_id)
1160                        .map(Arc::new)
1161                        .ok()
1162                });
1163            let hb_node_id = node_id.clone();
1164            let hb_tasks_success = Arc::clone(&tasks_success);
1165            let hb_tasks_failed = Arc::clone(&tasks_failed);
1166            // #399 — re-registration recovery: capture the register payload + the
1167            // shared runtime token so the loop can re-register and update the token
1168            // if the directory drops the node (deregister/TTL-expiry/restart).
1169            let hb_register_payload = self.build_register_payload();
1170            let hb_token_arc = Arc::clone(&self.runtime_token);
1171            let hb_register_url = format!("{}/v1/register", dir.trim_end_matches('/'));
1172            tokio::spawn(async move {
1173                let mut token = token;
1174                let mut seq: u64 = 0;
1175                loop {
1176                    tokio::time::sleep(Duration::from_secs(HEARTBEAT_INTERVAL_SECS)).await;
1177                    seq += 1;
1178                    // Drain incremental task counters so the directory receives
1179                    // the delta since the last heartbeat (ReputationService::upsert
1180                    // expects incremental, not cumulative counts).
1181                    let ok = hb_tasks_success.swap(0, Ordering::Relaxed);
1182                    let fail = hb_tasks_failed.swap(0, Ordering::Relaxed);
1183                    match http
1184                        // /v1/heartbeat — see heartbeat() above for the doubled-prefix
1185                        // history. Same fix applied here in the background loop.
1186                        .post(format!("{}/v1/heartbeat", dir.trim_end_matches('/')))
1187                        .bearer_auth(&token)
1188                        .json(&json!({
1189                            "node_id": &node_id,
1190                            "node_token": &token,
1191                            "status": "available",
1192                            // Live capacity after availability shaping (ADR-006).
1193                            "max_concurrent": avail.effective_max_concurrent(max_c),
1194                            // Task outcome metrics — only sent when non-zero to
1195                            // avoid moving reputation on idle periods.
1196                            "metrics": if ok > 0 || fail > 0 {
1197                                json!({"tasks_success": ok, "tasks_failed": fail})
1198                            } else {
1199                                json!({})
1200                            },
1201                        }))
1202                        .send()
1203                        .await
1204                    {
1205                        Ok(resp) if resp.status().is_success() => {
1206                            if let Some(ref log) = hb_log {
1207                                log.write("heartbeat_ok", &hb_node_id, &format!("seq={seq}"));
1208                            }
1209                        }
1210                        // #399 — directory no longer knows this node (it was
1211                        // deregistered on a prior shutdown, TTL-expired after a
1212                        // heartbeat gap, or the directory restarted). Re-register
1213                        // and resume with the fresh token instead of heartbeating
1214                        // into the void forever.
1215                        Ok(resp) if matches!(resp.status().as_u16(), 401 | 404 | 410) => {
1216                            let code = resp.status().as_u16();
1217                            tracing::warn!(
1218                                "heartbeat rejected ({code}) — node unknown to directory; re-registering"
1219                            );
1220                            match reregister(&http, &hb_register_url, &hb_register_payload).await {
1221                                Some(t) => {
1222                                    token = t;
1223                                    if let Ok(mut g) = hb_token_arc.write() {
1224                                        *g = token.clone();
1225                                    }
1226                                    if let Some(ref log) = hb_log {
1227                                        log.write(
1228                                            "reregister_ok",
1229                                            &hb_node_id,
1230                                            &format!("seq={seq} after_status={code}"),
1231                                        );
1232                                    }
1233                                }
1234                                None => {
1235                                    tracing::warn!("re-registration failed (after status {code})");
1236                                    if let Some(ref log) = hb_log {
1237                                        log.write(
1238                                            "reregister_fail",
1239                                            &hb_node_id,
1240                                            &format!("seq={seq} after_status={code}"),
1241                                        );
1242                                    }
1243                                }
1244                            }
1245                        }
1246                        Ok(resp) => {
1247                            if let Some(ref log) = hb_log {
1248                                log.write(
1249                                    "heartbeat_fail",
1250                                    &hb_node_id,
1251                                    &format!("seq={seq} status={}", resp.status().as_u16()),
1252                                );
1253                            }
1254                        }
1255                        Err(e) => {
1256                            tracing::warn!("heartbeat failed: {e}");
1257                            if let Some(ref log) = hb_log {
1258                                log.write(
1259                                    "heartbeat_fail",
1260                                    &hb_node_id,
1261                                    &format!("seq={seq} error={e}"),
1262                                );
1263                            }
1264                        }
1265                    }
1266                }
1267            });
1268        }
1269
1270        // #343 — pinhole renewal task: extends the UPnP IPv6 firewall pinhole at lease/2.
1271        #[cfg(feature = "nat")]
1272        {
1273            let uid_arc = Arc::clone(&shared_pinhole_uid);
1274            let lease_arc = Arc::clone(&shared_pinhole_lease);
1275            tokio::spawn(async move {
1276                loop {
1277                    let (_uid, lease) = {
1278                        let u = uid_arc.read().ok().and_then(|g| *g);
1279                        let l = lease_arc.read().map(|g| *g).unwrap_or(3600);
1280                        (u, l)
1281                    };
1282                    let delay = Duration::from_secs(u64::from((lease / 2).max(60)));
1283                    tokio::time::sleep(delay).await;
1284                    let uid = match uid_arc.read().ok().and_then(|g| *g) {
1285                        Some(u) => u,
1286                        None => return,
1287                    };
1288                    let ok = crate::nat_detection::renew_ipv6_pinhole(uid, lease).await;
1289                    if ok {
1290                        tracing::debug!("UPnP IPv6 pinhole uid={uid} renewed (lease={lease}s)");
1291                    } else {
1292                        tracing::warn!("UPnP IPv6 pinhole uid={uid} renewal failed — will retry");
1293                    }
1294                }
1295            });
1296        }
1297
1298        // R1: start RelayAcceptServer when relay-capable (#341)
1299        #[cfg(feature = "iicp-tcp")]
1300        if self.cfg.relay_capable {
1301            let relay_reg = relay_sessions_arc;
1302            let relay_host_str = bind_host.clone();
1303            let relay_port = self.cfg.relay_accept_port;
1304            tokio::spawn(async move {
1305                let srv = Arc::new(crate::relay_session::RelayAcceptServer::new(
1306                    (*relay_reg).clone(),
1307                    relay_host_str,
1308                    relay_port,
1309                ));
1310                if let Err(e) = srv.serve().await {
1311                    tracing::warn!("Relay accept server error: {e}");
1312                }
1313            });
1314        }
1315
1316        // R2: start relay worker client if relay_worker_endpoint is configured (#341)
1317        #[cfg(feature = "iicp-tcp")]
1318        if let Some(ref ep) = self.cfg.relay_worker_endpoint {
1319            let ep = ep.clone();
1320            let node_id = self.cfg.node_id.clone();
1321            let intent = self.cfg.intent.clone();
1322            let models = self.cfg.model.clone().map(|m| vec![m]).unwrap_or_default();
1323            let handler_fn: crate::relay_worker_client::RelayHandlerFn =
1324                Arc::new(move |task: Value| {
1325                    let h = Arc::clone(&handler_for_relay);
1326                    Box::pin(async move {
1327                        let req = crate::node::TaskRequest {
1328                            task_id: task
1329                                .get("task_id")
1330                                .and_then(|v| v.as_str())
1331                                .unwrap_or("")
1332                                .to_string(),
1333                            intent: task
1334                                .get("intent")
1335                                .and_then(|v| v.as_str())
1336                                .unwrap_or("")
1337                                .to_string(),
1338                            payload: task.get("payload").cloned().unwrap_or(Value::Null),
1339                            constraints: task.get("constraints").cloned(),
1340                            auth: task.get("auth").cloned(),
1341                            nonce: None,
1342                            _trace: None,
1343                        };
1344                        h(req)
1345                            .await
1346                            .unwrap_or_else(|e| json!({"error": e.to_string()}))
1347                    })
1348                });
1349            let (rhost, rport) = {
1350                if let Some(pos) = ep.rfind(':') {
1351                    let port = ep[pos + 1..].parse::<u16>().unwrap_or(9485);
1352                    (ep[..pos].to_string(), port)
1353                } else {
1354                    (ep.clone(), 9485u16)
1355                }
1356            };
1357            // on_bind: re-register with the relay's public endpoint so the node
1358            // appears ACTIVE in directory + stats (#358).
1359            let http_client = self.http.clone();
1360            let dir_url = self.cfg.directory_url.clone();
1361            let on_bind_cb: crate::relay_worker_client::OnBindFn = Arc::new(
1362                move |rh: String, rp: u16, _wid: String| {
1363                    let http = http_client.clone();
1364                    let dir = dir_url.clone();
1365                    Box::pin(async move {
1366                        // A full re-register would require the IicpNode reference here,
1367                        // which isn't available. For v0.7.0 we log the bind event.
1368                        // The node operator should use the cli bin which has the full
1369                        // context to re-register. Full wiring tracked in #341 R2.
1370                        tracing::info!(
1371                            "Relay worker bound to relay {}:{} — update directory registration to use relay endpoint",
1372                            rh, rp,
1373                        );
1374                        let _ = (http, dir); // suppress unused warnings
1375                    })
1376                },
1377            );
1378            tokio::spawn(async move {
1379                let rwc = Arc::new(
1380                    crate::relay_worker_client::RelayWorkerClient::new(
1381                        node_id, intent, rhost, rport, handler_fn, models,
1382                    )
1383                    .with_on_bind(on_bind_cb),
1384                );
1385                rwc.run().await;
1386            });
1387        }
1388
1389        axum::serve(listener, app)
1390            .await
1391            .map_err(|e| IicpError::Node(e.to_string()))
1392    }
1393}
1394
1395#[cfg(test)]
1396mod capability_tests {
1397    use super::build_capabilities;
1398
1399    const CHAT: &str = "urn:iicp:intent:llm:chat:v1";
1400    const EMBED: &str = "urn:iicp:intent:llm:embedding:v1";
1401
1402    // #409 — a backend serving a chat model AND an embedding model advertises
1403    // BOTH intents (the verified LM Studio case). Fails on the old single-cap code.
1404    #[test]
1405    fn chat_plus_embedding_models_advertise_two_intents() {
1406        let models = vec![
1407            "qwen2.5-coder-14b-instruct".to_string(),
1408            "text-embedding-nomic-embed-text-v1.5".to_string(),
1409        ];
1410        let caps = build_capabilities(&models, CHAT, 4096);
1411        assert_eq!(caps.len(), 2, "should advertise chat + embedding");
1412        // chat first (configured model leads), embedding second
1413        assert_eq!(caps[0]["intent"], CHAT);
1414        assert_eq!(
1415            caps[0]["models"],
1416            serde_json::json!(["qwen2.5-coder-14b-instruct"])
1417        );
1418        assert_eq!(caps[1]["intent"], EMBED);
1419        assert_eq!(
1420            caps[1]["models"],
1421            serde_json::json!(["text-embedding-nomic-embed-text-v1.5"])
1422        );
1423    }
1424
1425    // Back-compat: a chat-only model set yields exactly one text capability.
1426    #[test]
1427    fn chat_only_yields_single_capability() {
1428        let caps = build_capabilities(&["qwen2.5:0.5b".to_string()], CHAT, 4096);
1429        assert_eq!(caps.len(), 1);
1430        assert_eq!(caps[0]["intent"], CHAT);
1431        assert_eq!(caps[0]["models"], serde_json::json!(["qwen2.5:0.5b"]));
1432        assert_eq!(caps[0]["input_modalities"], serde_json::json!(["text"]));
1433    }
1434
1435    // #408/ADR-046 — a vision model advertises a chat capability with image input,
1436    // SEPARATE from the text-only chat capability. Fails without modality grouping.
1437    #[test]
1438    fn vision_model_advertises_image_modality_chat_capability() {
1439        let models = vec![
1440            "qwen2.5-coder-14b".to_string(),
1441            "qwen/qwen3-vl-8b".to_string(),
1442        ];
1443        let caps = build_capabilities(&models, CHAT, 4096);
1444        assert_eq!(
1445            caps.len(),
1446            2,
1447            "text-chat and vision-chat are distinct capabilities"
1448        );
1449        assert_eq!(caps[0]["intent"], CHAT);
1450        assert_eq!(caps[0]["input_modalities"], serde_json::json!(["text"]));
1451        assert_eq!(caps[0]["models"], serde_json::json!(["qwen2.5-coder-14b"]));
1452        assert_eq!(caps[1]["intent"], CHAT);
1453        assert_eq!(
1454            caps[1]["input_modalities"],
1455            serde_json::json!(["text", "image"])
1456        );
1457        assert_eq!(caps[1]["models"], serde_json::json!(["qwen/qwen3-vl-8b"]));
1458    }
1459
1460    // B1/#414 — an audio-in chat model advertises a chat capability with audio input,
1461    // SEPARATE from the text-only chat capability. Mirrors the vision (image) case.
1462    #[test]
1463    fn audio_model_advertises_audio_modality_chat_capability() {
1464        let models = vec!["qwen2.5:0.5b".to_string(), "qwen2-audio-7b".to_string()];
1465        let caps = build_capabilities(&models, CHAT, 4096);
1466        assert_eq!(caps.len(), 2);
1467        assert_eq!(caps[0]["input_modalities"], serde_json::json!(["text"]));
1468        assert_eq!(caps[1]["intent"], CHAT);
1469        assert_eq!(
1470            caps[1]["input_modalities"],
1471            serde_json::json!(["text", "audio"])
1472        );
1473        assert_eq!(caps[1]["models"], serde_json::json!(["qwen2-audio-7b"]));
1474    }
1475
1476    // B1 — an "omni" model accepts both image and audio in chat.
1477    #[test]
1478    fn omni_model_advertises_image_and_audio_modalities() {
1479        let caps = build_capabilities(&["qwen2.5-omni-7b".to_string()], CHAT, 4096);
1480        assert_eq!(caps.len(), 1);
1481        assert_eq!(
1482            caps[0]["input_modalities"],
1483            serde_json::json!(["text", "image", "audio"])
1484        );
1485    }
1486
1487    // No models → single default-intent capability with empty models (unchanged).
1488    #[test]
1489    fn empty_models_yields_default_intent_capability() {
1490        let caps = build_capabilities(&[], CHAT, 1024);
1491        assert_eq!(caps.len(), 1);
1492        assert_eq!(caps[0]["intent"], CHAT);
1493        assert_eq!(caps[0]["models"], serde_json::json!([]));
1494    }
1495}
1496
1497#[cfg(test)]
1498mod reregister_tests {
1499    use super::reregister;
1500    use serde_json::json;
1501
1502    // #404 — the re-register seam used by the self-healing heartbeat loop:
1503    // POST the register payload, return the fresh node_token.
1504    #[tokio::test]
1505    async fn reregister_returns_fresh_token() {
1506        let mut server = mockito::Server::new_async().await;
1507        let m = server
1508            .mock("POST", "/v1/register")
1509            .with_status(201)
1510            .with_body(json!({"node_token": "recovered-xyz"}).to_string())
1511            .create_async()
1512            .await;
1513        let http = reqwest::Client::new();
1514        let payload = json!({"endpoint": "https://x", "region": "r"});
1515        let url = format!("{}/v1/register", server.url());
1516        let tok = reregister(&http, &url, &payload).await;
1517        assert_eq!(tok, Some("recovered-xyz".to_string()));
1518        m.assert_async().await;
1519    }
1520
1521    #[tokio::test]
1522    async fn reregister_none_on_failure() {
1523        let mut server = mockito::Server::new_async().await;
1524        let _m = server
1525            .mock("POST", "/v1/register")
1526            .with_status(500)
1527            .create_async()
1528            .await;
1529        let http = reqwest::Client::new();
1530        let url = format!("{}/v1/register", server.url());
1531        let tok = reregister(&http, &url, &json!({})).await;
1532        assert_eq!(tok, None);
1533    }
1534}