1use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use axum::{
17 extract::State,
18 http::{HeaderMap, StatusCode},
19 response::{IntoResponse, Response},
20 routing::{get, post},
21 Json, Router,
22};
23use reqwest::Client;
24use serde::{Deserialize, Serialize};
25use serde_json::{json, Value};
26use socket2::{Domain, Protocol, Socket, Type};
27use tokio::net::TcpListener;
28use tokio::sync::Mutex;
29
30use crate::errors::{IicpError, Result};
31
32const DEFAULT_DIRECTORY: &str = "https://iicp.network/api";
33const HEARTBEAT_INTERVAL_SECS: u64 = 30;
34const NONCE_TTL_SECS: u64 = 300;
35
36async fn reregister(http: &Client, url: &str, payload: &serde_json::Value) -> Option<String> {
40 let resp = http.post(url).json(payload).send().await.ok()?;
41 if !resp.status().is_success() {
42 return None;
43 }
44 let data = resp.json::<serde_json::Value>().await.ok()?;
45 data["node_token"]
46 .as_str()
47 .or_else(|| data["token"].as_str())
48 .map(String::from)
49}
50
51fn intent_for_model(model: &str, default_intent: &str) -> String {
57 if model.to_lowercase().contains("embed") {
58 "urn:iicp:intent:llm:embedding:v1".to_string()
59 } else {
60 default_intent.to_string()
61 }
62}
63
64fn modalities_for_model(model: &str) -> Vec<&'static str> {
71 let m = model.to_lowercase();
72 let has_image = m.contains("-vl-")
73 || m.ends_with("-vl")
74 || m.contains("vision")
75 || m.contains("llava")
76 || m.contains("omni");
77 let has_audio = m.contains("audio") || m.contains("voxtral") || m.contains("omni");
78 let mut mods = vec!["text"];
79 if has_image {
80 mods.push("image");
81 }
82 if has_audio {
83 mods.push("audio");
84 }
85 mods
86}
87
88fn build_capabilities(models: &[String], default_intent: &str, max_tokens: u32) -> Vec<Value> {
97 if models.is_empty() {
98 return vec![json!({
99 "intent": default_intent, "models": [], "max_tokens": max_tokens,
100 "input_modalities": ["text"],
101 })];
102 }
103 let mut order: Vec<String> = Vec::new();
105 let mut groups: HashMap<String, (String, Vec<&'static str>, Vec<String>)> = HashMap::new();
106 for m in models {
107 let intent = intent_for_model(m, default_intent);
108 let modalities = modalities_for_model(m);
109 let key = format!("{intent}\u{0}{}", modalities.join(","));
110 let entry = groups.entry(key.clone()).or_insert_with(|| {
111 order.push(key.clone());
112 (intent.clone(), modalities.clone(), Vec::new())
113 });
114 if !entry.2.contains(m) {
115 entry.2.push(m.clone());
116 }
117 }
118 order
119 .into_iter()
120 .map(|key| {
121 let (intent, modalities, models) = groups.remove(&key).expect("key from order");
122 json!({
123 "intent": intent,
124 "models": models,
125 "max_tokens": max_tokens,
126 "input_modalities": modalities,
127 })
128 })
129 .collect()
130}
131
132#[derive(Debug, Clone)]
134pub struct NodeConfig {
135 pub node_id: String,
136 pub endpoint: String,
137 pub intent: String,
138 pub model: Option<String>,
139 pub backend: Option<String>,
142 pub region: Option<String>,
143 pub capabilities: Vec<String>,
144 pub directory_url: String,
145 pub timeout_ms: u64,
146 pub max_concurrent: usize,
148 pub tokens_per_min: u32,
150 pub max_tokens: u32,
152 pub transport_endpoint: Option<String>,
157 pub transport_method: Option<String>,
165 pub nat_type: Option<String>,
168 pub transport_metadata: Option<serde_json::Value>,
170 pub exposure_mode: Option<String>,
173 pub cip_policy: Option<std::sync::Arc<crate::cip_policy::CooperativeInferencePolicy>>,
178 pub pricing: Option<crate::pricing::PricingConfig>,
181 pub node_hmac_key: String,
185 pub availability_windows: Vec<crate::availability::Window>,
189 pub enable_idempotency: bool,
193 pub enable_mesh: bool,
196 pub relay_capable: bool,
199 pub relay_accept_port: u16,
202 pub relay_worker_endpoint: Option<String>,
205 pub log_dir: Option<std::path::PathBuf>,
208}
209
210impl NodeConfig {
211 pub fn new(
212 node_id: impl Into<String>,
213 endpoint: impl Into<String>,
214 intent: impl Into<String>,
215 ) -> Self {
216 Self {
217 node_id: node_id.into(),
218 endpoint: endpoint.into(),
219 intent: intent.into(),
220 model: None,
221 backend: None,
222 region: None,
223 capabilities: vec![],
224 directory_url: DEFAULT_DIRECTORY.into(),
225 timeout_ms: 5_000,
226 max_concurrent: 4,
227 tokens_per_min: 10_000,
228 max_tokens: 8_192,
229 transport_endpoint: None,
230 transport_method: None,
231 nat_type: None,
232 transport_metadata: None,
233 exposure_mode: None,
234 cip_policy: None,
235 pricing: None,
236 node_hmac_key: String::new(),
237 availability_windows: Vec::new(),
238 enable_idempotency: false,
239 enable_mesh: false,
240 relay_capable: false,
241 relay_accept_port: 9485,
242 relay_worker_endpoint: None,
243 log_dir: None,
244 }
245 }
246}
247
248#[derive(Debug, Deserialize)]
249pub struct TaskRequest {
250 pub task_id: String,
251 pub intent: String,
252 pub payload: Value,
253 pub constraints: Option<Value>,
254 pub auth: Option<Value>,
255 pub nonce: Option<String>,
256 #[serde(skip_deserializing)]
258 pub _trace: Option<Value>,
259}
260
261#[derive(Debug, Serialize)]
262pub struct TaskResponse {
263 pub task_id: String,
264 pub status: String,
265 #[serde(skip_serializing_if = "Option::is_none")]
266 pub result: Option<Value>,
267 #[serde(skip_serializing_if = "Option::is_none")]
268 pub error: Option<Value>,
269}
270
271pub type TaskHandlerFn = Arc<
272 dyn Fn(
273 TaskRequest,
274 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send>>
275 + Send
276 + Sync,
277>;
278
279struct AppState {
280 handler: TaskHandlerFn,
281 node_id: String,
282 region: String,
283 intent: String,
284 model: String,
285 active_jobs: Arc<AtomicUsize>,
286 tasks_success: Arc<AtomicUsize>,
288 tasks_failed: Arc<AtomicUsize>,
289 max_concurrent: usize,
290 availability: Arc<crate::availability::AvailabilityEvaluator>,
291 cip_policy: Arc<crate::cip_policy::CooperativeInferencePolicy>,
293 idempotency: Arc<crate::idempotency::IdempotencyGuard>,
294 enable_idempotency: bool,
295 peer_manager: Arc<crate::peer_manager::PeerManager>,
296 http: reqwest::Client,
297 nonce_cache: Arc<Mutex<HashMap<String, Instant>>>,
298 pinhole_uid: Arc<std::sync::RwLock<Option<u32>>>,
300 pinhole_lease_seconds: Arc<std::sync::RwLock<u32>>,
301 #[cfg(feature = "iicp-tcp")]
303 relay_sessions: Arc<crate::relay_session::RelaySessionRegistry>,
304}
305
306async fn health_endpoint(State(state): State<Arc<AppState>>) -> impl IntoResponse {
309 let active = state.active_jobs.load(Ordering::Relaxed);
310 let uid = state.pinhole_uid.read().ok().and_then(|g| *g);
311 let lease = state
312 .pinhole_lease_seconds
313 .read()
314 .map(|g| *g)
315 .unwrap_or(3600);
316 let pinhole_state = if let Some(uid) = uid {
317 json!({ "active": true, "unique_id": uid, "lease_seconds": lease })
318 } else {
319 json!({ "active": false })
320 };
321 let eff_max = state
322 .availability
323 .effective_max_concurrent(state.max_concurrent);
324 Json(json!({
325 "status": "ok",
326 "node_id": state.node_id,
327 "region": state.region,
328 "load": (active as f64 / state.max_concurrent.max(1) as f64),
329 "active_jobs": active,
330 "max_concurrent": state.max_concurrent,
331 "effective_max_concurrent": eff_max,
332 "available": active < eff_max,
333 "model": state.model,
334 "intent": state.intent,
335 "pinhole_state": pinhole_state,
336 }))
337}
338
339async fn metrics_endpoint() -> Response {
342 #[cfg(feature = "metrics")]
343 {
344 use prometheus::{Encoder, TextEncoder};
345 let encoder = TextEncoder::new();
346 let mf = prometheus::gather();
347 let mut buf = Vec::new();
348 if encoder.encode(&mf, &mut buf).is_ok() {
349 return (
350 StatusCode::OK,
351 [(
352 axum::http::header::CONTENT_TYPE,
353 "text/plain; version=0.0.4",
354 )],
355 buf,
356 )
357 .into_response();
358 }
359 }
360 (
361 StatusCode::SERVICE_UNAVAILABLE,
362 "metrics feature not enabled",
363 )
364 .into_response()
365}
366
367async fn peers_endpoint(
370 State(state): State<Arc<AppState>>,
371 headers: HeaderMap,
372 body: axum::body::Bytes,
373) -> Response {
374 let sig = headers
375 .get("x-iicp-signature")
376 .and_then(|v| v.to_str().ok());
377 if !state.peer_manager.verify_exchange(&body, sig) {
378 return (
379 StatusCode::UNAUTHORIZED,
380 Json(json!({"error":{"code":"IICP-E012","message":"invalid_signature"}})),
381 )
382 .into_response();
383 }
384 if let Ok(parsed) = serde_json::from_slice::<Value>(&body) {
385 if let Some(arr) = parsed.get("known_peers").and_then(Value::as_array) {
386 let dicts: Vec<Value> = arr.iter().filter(|p| p.is_object()).cloned().collect();
387 state.peer_manager.merge_peers(&dicts);
388 }
389 }
390 let peers: Vec<Value> = state
391 .peer_manager
392 .get_peers()
393 .iter()
394 .map(|p| {
395 json!({
396 "node_id": p.node_id,
397 "endpoint": p.endpoint,
398 "region": p.region,
399 "last_seen": p.last_seen,
400 })
401 })
402 .collect();
403 Json(json!({ "peers": peers })).into_response()
404}
405
406async fn relay_endpoint(
409 State(state): State<Arc<AppState>>,
410 Json(payload): Json<Value>,
411) -> Response {
412 let target_id = payload
413 .get("target_node_id")
414 .and_then(Value::as_str)
415 .unwrap_or("");
416 let task = payload.get("task");
417 if target_id.is_empty() || task.is_none() {
418 return (
419 StatusCode::UNPROCESSABLE_ENTITY,
420 Json(
421 json!({"error":{"code":"IICP-E000","message":"target_node_id and task required"}}),
422 ),
423 )
424 .into_response();
425 }
426 let task_val = task.expect("checked above").clone();
427
428 #[cfg(feature = "iicp-tcp")]
430 if let Some(session) = state.relay_sessions.get(target_id) {
431 match session.forward_task(&task_val, 120).await {
432 Ok(result) => {
433 let task_id = task_val
434 .get("task_id")
435 .and_then(Value::as_str)
436 .unwrap_or("");
437 return Json(json!({
438 "task_id": task_id,
439 "status": "completed",
440 "result": result
441 }))
442 .into_response();
443 }
444 Err(e) => {
445 return (
446 StatusCode::BAD_GATEWAY,
447 Json(json!({"error":{"code":"IICP-E031","message":format!("relay session forward failed: {e}")}})),
448 )
449 .into_response();
450 }
451 }
452 }
453
454 let target = match state.peer_manager.relay_target(target_id) {
456 Some(t) => t,
457 None => {
458 return (
459 StatusCode::NOT_FOUND,
460 Json(json!({"error":{"code":"IICP-E030","message":"target not in peer list and not a bound relay worker"}})),
461 )
462 .into_response();
463 }
464 };
465 let url = format!("{}/v1/task", target.endpoint.trim_end_matches('/'));
466 match state
467 .http
468 .post(&url)
469 .timeout(Duration::from_secs(120))
470 .json(&task_val)
471 .send()
472 .await
473 {
474 Ok(resp) => {
475 let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::OK);
476 let bytes = resp.bytes().await.unwrap_or_default();
477 (status, bytes).into_response()
478 }
479 Err(e) => (
480 StatusCode::BAD_GATEWAY,
481 Json(json!({"error":{"code":"IICP-E031","message":format!("relay failed: {e}")}})),
482 )
483 .into_response(),
484 }
485}
486
487async fn admit(state: &AppState, qos: &str) -> bool {
494 let cap = state
497 .availability
498 .effective_max_concurrent(state.max_concurrent);
499 let prev = state.active_jobs.fetch_add(1, Ordering::Relaxed);
500 if prev < cap {
501 return true;
502 }
503 state.active_jobs.fetch_sub(1, Ordering::Relaxed);
504 if !crate::scheduler::is_queue_eligible(qos) {
505 return false;
506 }
507 let deadline = Instant::now() + crate::scheduler::QUEUE_WAIT;
508 while Instant::now() < deadline {
509 tokio::time::sleep(Duration::from_millis(50)).await;
510 let cap = state
511 .availability
512 .effective_max_concurrent(state.max_concurrent);
513 let prev = state.active_jobs.fetch_add(1, Ordering::Relaxed);
514 if prev < cap {
515 return true;
516 }
517 state.active_jobs.fetch_sub(1, Ordering::Relaxed);
518 }
519 false
520}
521
522async fn task_endpoint(
523 State(state): State<Arc<AppState>>,
524 headers: HeaderMap,
525 Json(mut req): Json<TaskRequest>,
526) -> Response {
527 if !state.cip_policy.permits_intent(&req.intent) {
532 return (
533 StatusCode::FORBIDDEN,
534 Json(json!({
535 "error": {
536 "code": "tool_execution_denied",
537 "message": "Tool-execution intents are not permitted by this node's CIP policy",
538 }
539 })),
540 )
541 .into_response();
542 }
543
544 let qos = req
546 .constraints
547 .as_ref()
548 .and_then(|c| c.get("qos_class"))
549 .and_then(|v| v.as_str())
550 .unwrap_or("best_effort")
551 .to_string();
552 if !admit(&state, &qos).await {
553 return (
554 StatusCode::TOO_MANY_REQUESTS,
555 [("Retry-After", "2"), ("Content-Type", "application/json")],
556 Json(json!({
557 "error": {
558 "code": "IICP-E021",
559 "message": "capacity_exceeded",
560 "qos_class": qos,
561 "retry_after_ms": 2000,
562 }
563 })),
564 )
565 .into_response();
566 }
567
568 if let Some(ref nonce) = req.nonce {
570 let mut cache = state.nonce_cache.lock().await;
571 cache.retain(|_, inserted_at| inserted_at.elapsed().as_secs() < NONCE_TTL_SECS);
572 if cache.contains_key(nonce) {
573 state.active_jobs.fetch_sub(1, Ordering::Relaxed);
574 return (
575 StatusCode::CONFLICT,
576 Json(json!({
577 "error": { "code": "IICP-E011", "message": "replay_detected" }
578 })),
579 )
580 .into_response();
581 }
582 cache.insert(nonce.clone(), Instant::now());
583 }
584
585 if state.enable_idempotency && !state.idempotency.check_and_register(&req.task_id) {
588 state.active_jobs.fetch_sub(1, Ordering::Relaxed);
589 return (
590 StatusCode::CONFLICT,
591 Json(json!({
592 "error": { "code": "IICP-E010", "message": "duplicate_task" }
593 })),
594 )
595 .into_response();
596 }
597
598 if let Some(tp) = headers.get("traceparent").and_then(|v| v.to_str().ok()) {
600 req._trace = Some(json!({ "traceparent": tp }));
601 }
602
603 let task_id = req.task_id.clone();
604 let result = {
609 let span = tracing::info_span!(
610 "iicp.task.execute",
611 "iicp.task_id" = %task_id,
612 "iicp.intent" = %req.intent,
613 );
614 let _guard = span.enter();
615 (state.handler)(req).await
616 };
617 state.active_jobs.fetch_sub(1, Ordering::Relaxed);
618
619 match result {
620 Ok(value) => {
621 state.tasks_success.fetch_add(1, Ordering::Relaxed);
622 Json(TaskResponse {
623 task_id,
624 status: "completed".into(),
625 result: Some(value),
626 error: None,
627 })
628 .into_response()
629 }
630 Err(e) => {
631 state.tasks_failed.fetch_add(1, Ordering::Relaxed);
632 (
633 StatusCode::INTERNAL_SERVER_ERROR,
634 Json(TaskResponse {
635 task_id,
636 status: "error".into(),
637 result: None,
638 error: Some(json!({ "message": e.to_string() })),
639 }),
640 )
641 .into_response()
642 }
643 }
644}
645
646pub struct IicpNode {
650 cfg: NodeConfig,
651 http: Client,
652 runtime_hmac_key: std::sync::RwLock<String>,
657 runtime_token: Arc<std::sync::RwLock<String>>,
660 #[allow(dead_code)]
664 pinhole_uid: std::sync::RwLock<Option<u32>>,
665 #[allow(dead_code)]
666 pinhole_lease_seconds: std::sync::RwLock<u32>,
667 liveness_challenge: std::sync::RwLock<Option<String>>,
670}
671
672impl IicpNode {
673 pub fn new(cfg: NodeConfig) -> Self {
674 let http = Client::builder()
675 .timeout(Duration::from_millis(cfg.timeout_ms + 2_000))
676 .use_rustls_tls()
677 .build()
678 .expect("failed to build HTTP client");
679 let runtime_hmac_key = std::sync::RwLock::new(cfg.node_hmac_key.clone());
680 Self {
681 cfg,
682 http,
683 runtime_hmac_key,
684 runtime_token: Arc::new(std::sync::RwLock::new(String::new())),
685 pinhole_uid: std::sync::RwLock::new(None),
686 pinhole_lease_seconds: std::sync::RwLock::new(3600),
687 liveness_challenge: std::sync::RwLock::new(None),
688 }
689 }
690
691 pub fn node_hmac_key(&self) -> String {
694 self.runtime_hmac_key.read().expect("poisoned").clone()
695 }
696
697 pub fn cfg(&self) -> &NodeConfig {
701 &self.cfg
702 }
703
704 pub fn set_relay_worker_endpoint(&mut self, endpoint: String) {
708 self.cfg.relay_worker_endpoint = Some(endpoint);
709 }
710
711 #[cfg(feature = "nat")]
722 pub fn apply_nat_profile(&mut self, profile: &crate::nat_detection::NatProfile) {
723 if profile.is_reachable() {
724 if let Some(pub_ep) = &profile.public_endpoint {
725 self.cfg.endpoint = pub_ep.clone();
726 }
727 }
728 if let Some(tep) = &profile.transport_endpoint {
729 self.cfg.transport_endpoint = Some(tep.clone());
730 }
731 let tm = match profile.transport_method {
732 crate::nat_detection::TransportMethod::Direct => Some("direct"),
733 crate::nat_detection::TransportMethod::UpnpMapped => Some("upnp_mapped"),
734 crate::nat_detection::TransportMethod::StunHolePunch => Some("stun_hole_punch"),
735 crate::nat_detection::TransportMethod::TurnRelay => Some("turn_relay"),
736 crate::nat_detection::TransportMethod::ExternalTunnel => Some("external_tunnel"),
737 crate::nat_detection::TransportMethod::Unreachable => None,
738 };
739 if let Some(name) = tm {
740 self.cfg.transport_method = Some(name.into());
741 }
742 if self.cfg.nat_type.is_none() {
743 self.cfg.nat_type = Some("unknown".into());
744 }
745 let tail: Vec<&str> = profile
746 .detection_log
747 .iter()
748 .rev()
749 .take(1)
750 .map(|s| s.as_str())
751 .collect();
752 self.cfg.transport_metadata = Some(serde_json::json!({
753 "tier": profile.tier,
754 "detection_log_tail": tail,
755 }));
756 self.cfg.exposure_mode = Some(
759 crate::qualify::qualify_service(profile)
760 .exposure_mode
761 .to_string(),
762 );
763 if let Some(v6) = &profile.ipv6 {
765 if v6.pinhole_active {
766 if let Some(uid) = v6.pinhole_unique_id {
767 if let Ok(mut slot) = self.pinhole_uid.write() {
768 *slot = Some(uid);
769 }
770 }
771 if let Some(lease) = v6.pinhole_lease_seconds {
772 if let Ok(mut slot) = self.pinhole_lease_seconds.write() {
773 *slot = lease;
774 }
775 }
776 }
777 }
778 }
779
780 #[cfg(feature = "nat")]
782 pub async fn revoke_pinhole(&self) -> bool {
783 let uid = match self.pinhole_uid.write() {
784 Ok(mut slot) => slot.take(),
785 Err(_) => None,
786 };
787 match uid {
788 Some(uid) => crate::nat_detection::delete_ipv6_pinhole(uid).await,
789 None => false,
790 }
791 }
792
793 pub async fn deregister(&self, node_token: Option<&str>) -> Result<()> {
801 let stashed = self.runtime_token.read().expect("poisoned").clone();
802 let token = node_token.map(str::to_string).unwrap_or(stashed);
803 if token.is_empty() {
804 return Err(crate::errors::IicpError::Node(
805 "deregister() requires a node_token (none stashed — call register() first)".into(),
806 ));
807 }
808 let url = format!(
809 "{}/v1/register",
810 self.cfg.directory_url.trim_end_matches('/')
811 );
812 let resp = self
813 .http
814 .delete(&url)
815 .bearer_auth(&token)
816 .json(&serde_json::json!({"node_id": self.cfg.node_id}))
817 .send()
818 .await?;
819 let status = resp.status();
820 if !status.is_success() && status.as_u16() != 404 {
821 return Err(crate::errors::IicpError::Node(format!(
822 "Deregister failed: {status}"
823 )));
824 }
825 Ok(())
826 }
827
828 fn build_register_payload(&self) -> Value {
838 let mut models: Vec<String> = match &self.cfg.model {
841 Some(m) => vec![m.clone()],
842 None => Vec::new(),
843 };
844 for cap in &self.cfg.capabilities {
845 if !models.contains(cap) {
846 models.push(cap.clone());
847 }
848 }
849 let region = self
850 .cfg
851 .region
852 .clone()
853 .unwrap_or_else(|| "eu-central".to_string());
854
855 let mut payload = json!({
856 "endpoint": self.cfg.endpoint,
857 "region": region,
858 "capabilities": build_capabilities(&models, &self.cfg.intent, self.cfg.max_tokens),
862 "limits": {
863 "max_concurrent": self.cfg.max_concurrent,
864 "tokens_per_min": self.cfg.tokens_per_min,
865 },
866 });
867 if !self.cfg.node_id.is_empty() {
868 payload["node_id"] = json!(self.cfg.node_id);
869 }
870 if let Some(t) = &self.cfg.transport_endpoint {
871 payload["transport_endpoint"] = json!(t);
872 }
873 if let Some(m) = &self.cfg.transport_method {
874 payload["transport_method"] = json!(m);
875 }
876 if let Some(n) = &self.cfg.nat_type {
877 payload["nat_type"] = json!(n);
878 }
879 if let Some(md) = &self.cfg.transport_metadata {
880 payload["transport_metadata"] = md.clone();
881 }
882 if let Some(e) = &self.cfg.exposure_mode {
883 payload["exposure_mode"] = json!(e);
884 }
885 payload["sdk_language"] = json!("rust");
886 payload["sdk_version"] = json!(env!("CARGO_PKG_VERSION"));
887 if let Some(b) = &self.cfg.backend {
888 payload["backend"] = json!(b);
889 }
890 let policy_arc = self
891 .cfg
892 .cip_policy
893 .clone()
894 .unwrap_or_else(crate::cip_policy::get_cip_policy);
895 if let Some(block) = policy_arc.as_register_policy_block() {
896 payload["policy"] = block;
897 }
898 if let Some(pricing) = &self.cfg.pricing {
899 let hmac_key = self.runtime_hmac_key.read().expect("poisoned").clone();
900 payload["pricing"] = crate::pricing::build_pricing_block(pricing, &hmac_key);
901 }
902 if !self.cfg.node_hmac_key.is_empty() {
903 payload["node_hmac_key"] = json!(self.cfg.node_hmac_key);
904 }
905 payload
906 }
907
908 pub async fn register(&self) -> Result<String> {
909 let payload = self.build_register_payload();
910
911 let resp = self
912 .http
913 .post(format!(
914 "{}/v1/register",
915 self.cfg.directory_url.trim_end_matches('/')
916 ))
917 .json(&payload)
918 .send()
919 .await
920 .map_err(|e| IicpError::Node(e.to_string()))?;
921
922 if !resp.status().is_success() {
923 return Err(IicpError::Node(format!(
924 "register failed: {}",
925 resp.status()
926 )));
927 }
928 let data: Value = resp
929 .json()
930 .await
931 .map_err(|e| IicpError::Node(e.to_string()))?;
932 let token = data["node_token"]
933 .as_str()
934 .or_else(|| data["token"].as_str())
935 .ok_or_else(|| IicpError::Node(format!("no node_token in response: {data}")))?;
936 *self.runtime_token.write().expect("poisoned") = token.to_string();
938 if self.cfg.node_hmac_key.is_empty() {
942 if let Some(dir_key) = data["node_hmac_key"].as_str() {
943 if !dir_key.is_empty() {
944 let mut guard = self.runtime_hmac_key.write().expect("poisoned");
945 *guard = dir_key.to_string();
946 }
947 }
948 }
949 Ok(token.to_string())
950 }
951
952 pub async fn heartbeat(&self, node_token: &str) -> Result<()> {
954 let mut body = json!({
955 "node_id": self.cfg.node_id,
956 "node_token": node_token,
957 "status": "available",
958 "max_concurrent": crate::availability::AvailabilityEvaluator::new(
960 self.cfg.availability_windows.clone(),
961 )
962 .effective_max_concurrent(self.cfg.max_concurrent),
963 });
964 let hmac_key = self.node_hmac_key();
968 let stored = self.liveness_challenge.read().expect("poisoned").clone();
969 if let Some(ch) = &stored {
970 if !hmac_key.is_empty() {
971 body["challenge_response"] =
972 json!(crate::pricing::sign_body(ch.as_bytes(), &hmac_key));
973 }
974 }
975
976 let resp = self
977 .http
978 .post(format!(
982 "{}/v1/heartbeat",
983 self.cfg.directory_url.trim_end_matches('/')
984 ))
985 .bearer_auth(node_token)
988 .json(&body)
989 .send()
990 .await
991 .map_err(|e| IicpError::Node(e.to_string()))?;
992
993 if !resp.status().is_success() {
994 return Err(IicpError::Node(format!(
995 "heartbeat failed: {}",
996 resp.status()
997 )));
998 }
999 if let Ok(data) = resp.json::<Value>().await {
1001 if let Some(ch) = data["challenge"].as_str() {
1002 *self.liveness_challenge.write().expect("poisoned") = Some(ch.to_string());
1003 }
1004 }
1005 Ok(())
1006 }
1007
1008 pub async fn serve<F, Fut>(
1013 &self,
1014 handler: F,
1015 addr: &str,
1016 node_token: Option<String>,
1017 ) -> Result<()>
1018 where
1019 F: Fn(TaskRequest) -> Fut + Send + Sync + 'static,
1020 Fut: std::future::Future<Output = Result<Value>> + Send + 'static,
1021 {
1022 let handler: TaskHandlerFn = Arc::new(move |req| Box::pin(handler(req)));
1023 #[cfg(feature = "iicp-tcp")]
1025 let handler_for_relay = Arc::clone(&handler);
1026 #[cfg(feature = "iicp-tcp")]
1028 let bind_host: String = addr.split(':').next().unwrap_or("0.0.0.0").to_string();
1029 let active_jobs = Arc::new(AtomicUsize::new(0));
1030 let nonce_cache = Arc::new(Mutex::new(HashMap::new()));
1031 let shared_pinhole_uid: Arc<std::sync::RwLock<Option<u32>>> = Arc::new(
1033 std::sync::RwLock::new(self.pinhole_uid.read().ok().and_then(|g| *g)),
1034 );
1035 let shared_pinhole_lease: Arc<std::sync::RwLock<u32>> = Arc::new(std::sync::RwLock::new(
1036 self.pinhole_lease_seconds
1037 .read()
1038 .map(|g| *g)
1039 .unwrap_or(3600),
1040 ));
1041
1042 let tasks_success = Arc::new(AtomicUsize::new(0));
1043 let tasks_failed = Arc::new(AtomicUsize::new(0));
1044 let state = Arc::new(AppState {
1045 handler,
1046 node_id: self.cfg.node_id.clone(),
1047 region: self.cfg.region.clone().unwrap_or_else(|| "unknown".into()),
1048 intent: self.cfg.intent.clone(),
1049 model: self.cfg.model.clone().unwrap_or_default(),
1050 active_jobs,
1051 tasks_success: Arc::clone(&tasks_success),
1052 tasks_failed: Arc::clone(&tasks_failed),
1053 max_concurrent: self.cfg.max_concurrent,
1054 availability: Arc::new(crate::availability::AvailabilityEvaluator::new(
1055 self.cfg.availability_windows.clone(),
1056 )),
1057 cip_policy: self
1059 .cfg
1060 .cip_policy
1061 .clone()
1062 .unwrap_or_else(crate::cip_policy::get_cip_policy),
1063 idempotency: Arc::new(crate::idempotency::IdempotencyGuard::default()),
1064 enable_idempotency: self.cfg.enable_idempotency,
1065 peer_manager: Arc::new(crate::peer_manager::PeerManager::with_opts(
1066 self.cfg.directory_url.clone(),
1067 self.cfg.node_hmac_key.clone(),
1068 crate::peer_manager::PeerManagerOpts {
1069 relay_capable: self.cfg.relay_capable,
1070 relay_accept_port: self.cfg.relay_accept_port,
1071 },
1072 )),
1073 http: self.http.clone(),
1074 nonce_cache,
1075 pinhole_uid: Arc::clone(&shared_pinhole_uid),
1076 pinhole_lease_seconds: Arc::clone(&shared_pinhole_lease),
1077 #[cfg(feature = "iicp-tcp")]
1078 relay_sessions: Arc::new(crate::relay_session::RelaySessionRegistry::new()),
1079 });
1080
1081 let hb_availability = Arc::clone(&state.availability);
1084 if self.cfg.enable_mesh {
1086 let pm = Arc::clone(&state.peer_manager);
1087 let node_id = self.cfg.node_id.clone();
1088 let own_endpoint = self.cfg.endpoint.clone();
1089 tokio::spawn(async move {
1090 pm.start(&node_id, &own_endpoint).await;
1091 let interval = pm.gossip_interval();
1092 loop {
1093 tokio::time::sleep(interval).await;
1094 pm.gossip_round().await;
1095 }
1096 });
1097 }
1098
1099 let mut app = Router::new()
1100 .route("/v1/task", post(task_endpoint))
1101 .route("/iicp/health", get(health_endpoint))
1102 .route("/metrics", get(metrics_endpoint));
1103 if self.cfg.enable_mesh {
1104 app = app.route("/v1/peers", post(peers_endpoint));
1105 }
1106 if self.cfg.relay_capable {
1107 app = app.route("/v1/relay", post(relay_endpoint));
1108 }
1109 #[cfg(feature = "iicp-tcp")]
1111 let relay_sessions_arc = Arc::clone(&state.relay_sessions);
1112 let app = app.with_state(state);
1113
1114 let addr: SocketAddr = addr
1115 .parse()
1116 .map_err(|e| IicpError::Node(format!("invalid addr: {e}")))?;
1117
1118 let listener = if addr.is_ipv6() {
1123 let socket = Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP))
1124 .map_err(|e| IicpError::Node(format!("socket create: {e}")))?;
1125 socket
1126 .set_only_v6(false)
1127 .map_err(|e| IicpError::Node(format!("set_only_v6: {e}")))?;
1128 socket
1129 .set_reuse_address(true)
1130 .map_err(|e| IicpError::Node(format!("set_reuse_address: {e}")))?;
1131 socket
1132 .bind(&addr.into())
1133 .map_err(|e| IicpError::Node(format!("bind {addr}: {e}")))?;
1134 socket
1135 .listen(1024)
1136 .map_err(|e| IicpError::Node(format!("listen: {e}")))?;
1137 let std_listener: std::net::TcpListener = socket.into();
1138 std_listener
1139 .set_nonblocking(true)
1140 .map_err(|e| IicpError::Node(e.to_string()))?;
1141 TcpListener::from_std(std_listener).map_err(|e| IicpError::Node(e.to_string()))?
1142 } else {
1143 TcpListener::bind(addr)
1144 .await
1145 .map_err(|e| IicpError::Node(e.to_string()))?
1146 };
1147
1148 tracing::info!("IICP node {} listening on {}", self.cfg.node_id, addr);
1149
1150 if let Some(token) = node_token {
1151 let node_id = self.cfg.node_id.clone();
1152 let dir = self.cfg.directory_url.clone();
1153 let http = self.http.clone();
1154 let avail = Arc::clone(&hb_availability);
1155 let max_c = self.cfg.max_concurrent;
1156 let hb_log: Option<Arc<crate::node_log::NodeLog>> =
1158 self.cfg.log_dir.as_deref().and_then(|d| {
1159 crate::node_log::NodeLog::open(d, &node_id)
1160 .map(Arc::new)
1161 .ok()
1162 });
1163 let hb_node_id = node_id.clone();
1164 let hb_tasks_success = Arc::clone(&tasks_success);
1165 let hb_tasks_failed = Arc::clone(&tasks_failed);
1166 let hb_register_payload = self.build_register_payload();
1170 let hb_token_arc = Arc::clone(&self.runtime_token);
1171 let hb_register_url = format!("{}/v1/register", dir.trim_end_matches('/'));
1172 tokio::spawn(async move {
1173 let mut token = token;
1174 let mut seq: u64 = 0;
1175 loop {
1176 tokio::time::sleep(Duration::from_secs(HEARTBEAT_INTERVAL_SECS)).await;
1177 seq += 1;
1178 let ok = hb_tasks_success.swap(0, Ordering::Relaxed);
1182 let fail = hb_tasks_failed.swap(0, Ordering::Relaxed);
1183 match http
1184 .post(format!("{}/v1/heartbeat", dir.trim_end_matches('/')))
1187 .bearer_auth(&token)
1188 .json(&json!({
1189 "node_id": &node_id,
1190 "node_token": &token,
1191 "status": "available",
1192 "max_concurrent": avail.effective_max_concurrent(max_c),
1194 "metrics": if ok > 0 || fail > 0 {
1197 json!({"tasks_success": ok, "tasks_failed": fail})
1198 } else {
1199 json!({})
1200 },
1201 }))
1202 .send()
1203 .await
1204 {
1205 Ok(resp) if resp.status().is_success() => {
1206 if let Some(ref log) = hb_log {
1207 log.write("heartbeat_ok", &hb_node_id, &format!("seq={seq}"));
1208 }
1209 }
1210 Ok(resp) if matches!(resp.status().as_u16(), 401 | 404 | 410) => {
1216 let code = resp.status().as_u16();
1217 tracing::warn!(
1218 "heartbeat rejected ({code}) — node unknown to directory; re-registering"
1219 );
1220 match reregister(&http, &hb_register_url, &hb_register_payload).await {
1221 Some(t) => {
1222 token = t;
1223 if let Ok(mut g) = hb_token_arc.write() {
1224 *g = token.clone();
1225 }
1226 if let Some(ref log) = hb_log {
1227 log.write(
1228 "reregister_ok",
1229 &hb_node_id,
1230 &format!("seq={seq} after_status={code}"),
1231 );
1232 }
1233 }
1234 None => {
1235 tracing::warn!("re-registration failed (after status {code})");
1236 if let Some(ref log) = hb_log {
1237 log.write(
1238 "reregister_fail",
1239 &hb_node_id,
1240 &format!("seq={seq} after_status={code}"),
1241 );
1242 }
1243 }
1244 }
1245 }
1246 Ok(resp) => {
1247 if let Some(ref log) = hb_log {
1248 log.write(
1249 "heartbeat_fail",
1250 &hb_node_id,
1251 &format!("seq={seq} status={}", resp.status().as_u16()),
1252 );
1253 }
1254 }
1255 Err(e) => {
1256 tracing::warn!("heartbeat failed: {e}");
1257 if let Some(ref log) = hb_log {
1258 log.write(
1259 "heartbeat_fail",
1260 &hb_node_id,
1261 &format!("seq={seq} error={e}"),
1262 );
1263 }
1264 }
1265 }
1266 }
1267 });
1268 }
1269
1270 #[cfg(feature = "nat")]
1272 {
1273 let uid_arc = Arc::clone(&shared_pinhole_uid);
1274 let lease_arc = Arc::clone(&shared_pinhole_lease);
1275 tokio::spawn(async move {
1276 loop {
1277 let (_uid, lease) = {
1278 let u = uid_arc.read().ok().and_then(|g| *g);
1279 let l = lease_arc.read().map(|g| *g).unwrap_or(3600);
1280 (u, l)
1281 };
1282 let delay = Duration::from_secs(u64::from((lease / 2).max(60)));
1283 tokio::time::sleep(delay).await;
1284 let uid = match uid_arc.read().ok().and_then(|g| *g) {
1285 Some(u) => u,
1286 None => return,
1287 };
1288 let ok = crate::nat_detection::renew_ipv6_pinhole(uid, lease).await;
1289 if ok {
1290 tracing::debug!("UPnP IPv6 pinhole uid={uid} renewed (lease={lease}s)");
1291 } else {
1292 tracing::warn!("UPnP IPv6 pinhole uid={uid} renewal failed — will retry");
1293 }
1294 }
1295 });
1296 }
1297
1298 #[cfg(feature = "iicp-tcp")]
1300 if self.cfg.relay_capable {
1301 let relay_reg = relay_sessions_arc;
1302 let relay_host_str = bind_host.clone();
1303 let relay_port = self.cfg.relay_accept_port;
1304 tokio::spawn(async move {
1305 let srv = Arc::new(crate::relay_session::RelayAcceptServer::new(
1306 (*relay_reg).clone(),
1307 relay_host_str,
1308 relay_port,
1309 ));
1310 if let Err(e) = srv.serve().await {
1311 tracing::warn!("Relay accept server error: {e}");
1312 }
1313 });
1314 }
1315
1316 #[cfg(feature = "iicp-tcp")]
1318 if let Some(ref ep) = self.cfg.relay_worker_endpoint {
1319 let ep = ep.clone();
1320 let node_id = self.cfg.node_id.clone();
1321 let intent = self.cfg.intent.clone();
1322 let models = self.cfg.model.clone().map(|m| vec![m]).unwrap_or_default();
1323 let handler_fn: crate::relay_worker_client::RelayHandlerFn =
1324 Arc::new(move |task: Value| {
1325 let h = Arc::clone(&handler_for_relay);
1326 Box::pin(async move {
1327 let req = crate::node::TaskRequest {
1328 task_id: task
1329 .get("task_id")
1330 .and_then(|v| v.as_str())
1331 .unwrap_or("")
1332 .to_string(),
1333 intent: task
1334 .get("intent")
1335 .and_then(|v| v.as_str())
1336 .unwrap_or("")
1337 .to_string(),
1338 payload: task.get("payload").cloned().unwrap_or(Value::Null),
1339 constraints: task.get("constraints").cloned(),
1340 auth: task.get("auth").cloned(),
1341 nonce: None,
1342 _trace: None,
1343 };
1344 h(req)
1345 .await
1346 .unwrap_or_else(|e| json!({"error": e.to_string()}))
1347 })
1348 });
1349 let (rhost, rport) = {
1350 if let Some(pos) = ep.rfind(':') {
1351 let port = ep[pos + 1..].parse::<u16>().unwrap_or(9485);
1352 (ep[..pos].to_string(), port)
1353 } else {
1354 (ep.clone(), 9485u16)
1355 }
1356 };
1357 let http_client = self.http.clone();
1360 let dir_url = self.cfg.directory_url.clone();
1361 let on_bind_cb: crate::relay_worker_client::OnBindFn = Arc::new(
1362 move |rh: String, rp: u16, _wid: String| {
1363 let http = http_client.clone();
1364 let dir = dir_url.clone();
1365 Box::pin(async move {
1366 tracing::info!(
1371 "Relay worker bound to relay {}:{} — update directory registration to use relay endpoint",
1372 rh, rp,
1373 );
1374 let _ = (http, dir); })
1376 },
1377 );
1378 tokio::spawn(async move {
1379 let rwc = Arc::new(
1380 crate::relay_worker_client::RelayWorkerClient::new(
1381 node_id, intent, rhost, rport, handler_fn, models,
1382 )
1383 .with_on_bind(on_bind_cb),
1384 );
1385 rwc.run().await;
1386 });
1387 }
1388
1389 axum::serve(listener, app)
1390 .await
1391 .map_err(|e| IicpError::Node(e.to_string()))
1392 }
1393}
1394
1395#[cfg(test)]
1396mod capability_tests {
1397 use super::build_capabilities;
1398
1399 const CHAT: &str = "urn:iicp:intent:llm:chat:v1";
1400 const EMBED: &str = "urn:iicp:intent:llm:embedding:v1";
1401
1402 #[test]
1405 fn chat_plus_embedding_models_advertise_two_intents() {
1406 let models = vec![
1407 "qwen2.5-coder-14b-instruct".to_string(),
1408 "text-embedding-nomic-embed-text-v1.5".to_string(),
1409 ];
1410 let caps = build_capabilities(&models, CHAT, 4096);
1411 assert_eq!(caps.len(), 2, "should advertise chat + embedding");
1412 assert_eq!(caps[0]["intent"], CHAT);
1414 assert_eq!(
1415 caps[0]["models"],
1416 serde_json::json!(["qwen2.5-coder-14b-instruct"])
1417 );
1418 assert_eq!(caps[1]["intent"], EMBED);
1419 assert_eq!(
1420 caps[1]["models"],
1421 serde_json::json!(["text-embedding-nomic-embed-text-v1.5"])
1422 );
1423 }
1424
1425 #[test]
1427 fn chat_only_yields_single_capability() {
1428 let caps = build_capabilities(&["qwen2.5:0.5b".to_string()], CHAT, 4096);
1429 assert_eq!(caps.len(), 1);
1430 assert_eq!(caps[0]["intent"], CHAT);
1431 assert_eq!(caps[0]["models"], serde_json::json!(["qwen2.5:0.5b"]));
1432 assert_eq!(caps[0]["input_modalities"], serde_json::json!(["text"]));
1433 }
1434
1435 #[test]
1438 fn vision_model_advertises_image_modality_chat_capability() {
1439 let models = vec![
1440 "qwen2.5-coder-14b".to_string(),
1441 "qwen/qwen3-vl-8b".to_string(),
1442 ];
1443 let caps = build_capabilities(&models, CHAT, 4096);
1444 assert_eq!(
1445 caps.len(),
1446 2,
1447 "text-chat and vision-chat are distinct capabilities"
1448 );
1449 assert_eq!(caps[0]["intent"], CHAT);
1450 assert_eq!(caps[0]["input_modalities"], serde_json::json!(["text"]));
1451 assert_eq!(caps[0]["models"], serde_json::json!(["qwen2.5-coder-14b"]));
1452 assert_eq!(caps[1]["intent"], CHAT);
1453 assert_eq!(
1454 caps[1]["input_modalities"],
1455 serde_json::json!(["text", "image"])
1456 );
1457 assert_eq!(caps[1]["models"], serde_json::json!(["qwen/qwen3-vl-8b"]));
1458 }
1459
1460 #[test]
1463 fn audio_model_advertises_audio_modality_chat_capability() {
1464 let models = vec!["qwen2.5:0.5b".to_string(), "qwen2-audio-7b".to_string()];
1465 let caps = build_capabilities(&models, CHAT, 4096);
1466 assert_eq!(caps.len(), 2);
1467 assert_eq!(caps[0]["input_modalities"], serde_json::json!(["text"]));
1468 assert_eq!(caps[1]["intent"], CHAT);
1469 assert_eq!(
1470 caps[1]["input_modalities"],
1471 serde_json::json!(["text", "audio"])
1472 );
1473 assert_eq!(caps[1]["models"], serde_json::json!(["qwen2-audio-7b"]));
1474 }
1475
1476 #[test]
1478 fn omni_model_advertises_image_and_audio_modalities() {
1479 let caps = build_capabilities(&["qwen2.5-omni-7b".to_string()], CHAT, 4096);
1480 assert_eq!(caps.len(), 1);
1481 assert_eq!(
1482 caps[0]["input_modalities"],
1483 serde_json::json!(["text", "image", "audio"])
1484 );
1485 }
1486
1487 #[test]
1489 fn empty_models_yields_default_intent_capability() {
1490 let caps = build_capabilities(&[], CHAT, 1024);
1491 assert_eq!(caps.len(), 1);
1492 assert_eq!(caps[0]["intent"], CHAT);
1493 assert_eq!(caps[0]["models"], serde_json::json!([]));
1494 }
1495}
1496
1497#[cfg(test)]
1498mod reregister_tests {
1499 use super::reregister;
1500 use serde_json::json;
1501
1502 #[tokio::test]
1505 async fn reregister_returns_fresh_token() {
1506 let mut server = mockito::Server::new_async().await;
1507 let m = server
1508 .mock("POST", "/v1/register")
1509 .with_status(201)
1510 .with_body(json!({"node_token": "recovered-xyz"}).to_string())
1511 .create_async()
1512 .await;
1513 let http = reqwest::Client::new();
1514 let payload = json!({"endpoint": "https://x", "region": "r"});
1515 let url = format!("{}/v1/register", server.url());
1516 let tok = reregister(&http, &url, &payload).await;
1517 assert_eq!(tok, Some("recovered-xyz".to_string()));
1518 m.assert_async().await;
1519 }
1520
1521 #[tokio::test]
1522 async fn reregister_none_on_failure() {
1523 let mut server = mockito::Server::new_async().await;
1524 let _m = server
1525 .mock("POST", "/v1/register")
1526 .with_status(500)
1527 .create_async()
1528 .await;
1529 let http = reqwest::Client::new();
1530 let url = format!("{}/v1/register", server.url());
1531 let tok = reregister(&http, &url, &json!({})).await;
1532 assert_eq!(tok, None);
1533 }
1534}