1use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use axum::{
17 extract::State,
18 http::{HeaderMap, StatusCode},
19 response::{IntoResponse, Response},
20 routing::{get, post},
21 Json, Router,
22};
23use reqwest::Client;
24use serde::{Deserialize, Serialize};
25use serde_json::{json, Value};
26use tokio::net::TcpListener;
27use tokio::sync::Mutex;
28
29use crate::errors::{IicpError, Result};
30
31const DEFAULT_DIRECTORY: &str = "https://iicp.network/api";
32const HEARTBEAT_INTERVAL_SECS: u64 = 30;
33const NONCE_TTL_SECS: u64 = 300;
34
35#[derive(Debug, Clone)]
37pub struct NodeConfig {
38 pub node_id: String,
39 pub endpoint: String,
40 pub intent: String,
41 pub model: Option<String>,
42 pub region: Option<String>,
43 pub capabilities: Vec<String>,
44 pub directory_url: String,
45 pub timeout_ms: u64,
46 pub max_concurrent: usize,
48 pub tokens_per_min: u32,
50 pub max_tokens: u32,
52 pub transport_endpoint: Option<String>,
57 pub transport_method: Option<String>,
65 pub nat_type: Option<String>,
68 pub transport_metadata: Option<serde_json::Value>,
70 pub exposure_mode: Option<String>,
73 pub cip_policy: Option<std::sync::Arc<crate::cip_policy::CooperativeInferencePolicy>>,
78 pub pricing: Option<crate::pricing::PricingConfig>,
81 pub node_hmac_key: String,
85 pub availability_windows: Vec<crate::availability::Window>,
89 pub enable_idempotency: bool,
93 pub enable_mesh: bool,
96 pub relay_capable: bool,
99 pub relay_accept_port: u16,
102 pub relay_worker_endpoint: Option<String>,
105}
106
107impl NodeConfig {
108 pub fn new(
109 node_id: impl Into<String>,
110 endpoint: impl Into<String>,
111 intent: impl Into<String>,
112 ) -> Self {
113 Self {
114 node_id: node_id.into(),
115 endpoint: endpoint.into(),
116 intent: intent.into(),
117 model: None,
118 region: None,
119 capabilities: vec![],
120 directory_url: DEFAULT_DIRECTORY.into(),
121 timeout_ms: 5_000,
122 max_concurrent: 4,
123 tokens_per_min: 10_000,
124 max_tokens: 8_192,
125 transport_endpoint: None,
126 transport_method: None,
127 nat_type: None,
128 transport_metadata: None,
129 exposure_mode: None,
130 cip_policy: None,
131 pricing: None,
132 node_hmac_key: String::new(),
133 availability_windows: Vec::new(),
134 enable_idempotency: false,
135 enable_mesh: false,
136 relay_capable: false,
137 relay_accept_port: 9485,
138 relay_worker_endpoint: None,
139 }
140 }
141}
142
143#[derive(Debug, Deserialize)]
144pub struct TaskRequest {
145 pub task_id: String,
146 pub intent: String,
147 pub payload: Value,
148 pub constraints: Option<Value>,
149 pub auth: Option<Value>,
150 pub nonce: Option<String>,
151 #[serde(skip_deserializing)]
153 pub _trace: Option<Value>,
154}
155
156#[derive(Debug, Serialize)]
157pub struct TaskResponse {
158 pub task_id: String,
159 pub status: String,
160 #[serde(skip_serializing_if = "Option::is_none")]
161 pub result: Option<Value>,
162 #[serde(skip_serializing_if = "Option::is_none")]
163 pub error: Option<Value>,
164}
165
166pub type TaskHandlerFn = Arc<
167 dyn Fn(
168 TaskRequest,
169 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send>>
170 + Send
171 + Sync,
172>;
173
174struct AppState {
175 handler: TaskHandlerFn,
176 node_id: String,
177 region: String,
178 intent: String,
179 model: String,
180 active_jobs: Arc<AtomicUsize>,
181 max_concurrent: usize,
182 availability: Arc<crate::availability::AvailabilityEvaluator>,
183 idempotency: Arc<crate::idempotency::IdempotencyGuard>,
184 enable_idempotency: bool,
185 peer_manager: Arc<crate::peer_manager::PeerManager>,
186 http: reqwest::Client,
187 nonce_cache: Arc<Mutex<HashMap<String, Instant>>>,
188 pinhole_uid: Arc<std::sync::RwLock<Option<u32>>>,
190 pinhole_lease_seconds: Arc<std::sync::RwLock<u32>>,
191 #[cfg(feature = "iicp-tcp")]
193 relay_sessions: Arc<crate::relay_session::RelaySessionRegistry>,
194}
195
196async fn health_endpoint(State(state): State<Arc<AppState>>) -> impl IntoResponse {
199 let active = state.active_jobs.load(Ordering::Relaxed);
200 let uid = state.pinhole_uid.read().ok().and_then(|g| *g);
201 let lease = state
202 .pinhole_lease_seconds
203 .read()
204 .map(|g| *g)
205 .unwrap_or(3600);
206 let pinhole_state = if let Some(uid) = uid {
207 json!({ "active": true, "unique_id": uid, "lease_seconds": lease })
208 } else {
209 json!({ "active": false })
210 };
211 let eff_max = state
212 .availability
213 .effective_max_concurrent(state.max_concurrent);
214 Json(json!({
215 "status": "ok",
216 "node_id": state.node_id,
217 "region": state.region,
218 "load": (active as f64 / state.max_concurrent.max(1) as f64),
219 "active_jobs": active,
220 "max_concurrent": state.max_concurrent,
221 "effective_max_concurrent": eff_max,
222 "available": active < eff_max,
223 "model": state.model,
224 "intent": state.intent,
225 "pinhole_state": pinhole_state,
226 }))
227}
228
229async fn metrics_endpoint() -> Response {
232 #[cfg(feature = "metrics")]
233 {
234 use prometheus::{Encoder, TextEncoder};
235 let encoder = TextEncoder::new();
236 let mf = prometheus::gather();
237 let mut buf = Vec::new();
238 if encoder.encode(&mf, &mut buf).is_ok() {
239 return (
240 StatusCode::OK,
241 [(
242 axum::http::header::CONTENT_TYPE,
243 "text/plain; version=0.0.4",
244 )],
245 buf,
246 )
247 .into_response();
248 }
249 }
250 (
251 StatusCode::SERVICE_UNAVAILABLE,
252 "metrics feature not enabled",
253 )
254 .into_response()
255}
256
257async fn peers_endpoint(
260 State(state): State<Arc<AppState>>,
261 headers: HeaderMap,
262 body: axum::body::Bytes,
263) -> Response {
264 let sig = headers
265 .get("x-iicp-signature")
266 .and_then(|v| v.to_str().ok());
267 if !state.peer_manager.verify_exchange(&body, sig) {
268 return (
269 StatusCode::UNAUTHORIZED,
270 Json(json!({"error":{"code":"IICP-E012","message":"invalid_signature"}})),
271 )
272 .into_response();
273 }
274 if let Ok(parsed) = serde_json::from_slice::<Value>(&body) {
275 if let Some(arr) = parsed.get("known_peers").and_then(Value::as_array) {
276 let dicts: Vec<Value> = arr.iter().filter(|p| p.is_object()).cloned().collect();
277 state.peer_manager.merge_peers(&dicts);
278 }
279 }
280 let peers: Vec<Value> = state
281 .peer_manager
282 .get_peers()
283 .iter()
284 .map(|p| {
285 json!({
286 "node_id": p.node_id,
287 "endpoint": p.endpoint,
288 "region": p.region,
289 "last_seen": p.last_seen,
290 })
291 })
292 .collect();
293 Json(json!({ "peers": peers })).into_response()
294}
295
296async fn relay_endpoint(
299 State(state): State<Arc<AppState>>,
300 Json(payload): Json<Value>,
301) -> Response {
302 let target_id = payload
303 .get("target_node_id")
304 .and_then(Value::as_str)
305 .unwrap_or("");
306 let task = payload.get("task");
307 if target_id.is_empty() || task.is_none() {
308 return (
309 StatusCode::UNPROCESSABLE_ENTITY,
310 Json(
311 json!({"error":{"code":"IICP-E000","message":"target_node_id and task required"}}),
312 ),
313 )
314 .into_response();
315 }
316 let task_val = task.expect("checked above").clone();
317
318 #[cfg(feature = "iicp-tcp")]
320 if let Some(session) = state.relay_sessions.get(target_id) {
321 match session.forward_task(&task_val, 120).await {
322 Ok(result) => {
323 let task_id = task_val
324 .get("task_id")
325 .and_then(Value::as_str)
326 .unwrap_or("");
327 return Json(json!({
328 "task_id": task_id,
329 "status": "completed",
330 "result": result
331 }))
332 .into_response();
333 }
334 Err(e) => {
335 return (
336 StatusCode::BAD_GATEWAY,
337 Json(json!({"error":{"code":"IICP-E031","message":format!("relay session forward failed: {e}")}})),
338 )
339 .into_response();
340 }
341 }
342 }
343
344 let target = match state.peer_manager.relay_target(target_id) {
346 Some(t) => t,
347 None => {
348 return (
349 StatusCode::NOT_FOUND,
350 Json(json!({"error":{"code":"IICP-E030","message":"target not in peer list and not a bound relay worker"}})),
351 )
352 .into_response();
353 }
354 };
355 let url = format!("{}/v1/task", target.endpoint.trim_end_matches('/'));
356 match state
357 .http
358 .post(&url)
359 .timeout(Duration::from_secs(120))
360 .json(&task_val)
361 .send()
362 .await
363 {
364 Ok(resp) => {
365 let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::OK);
366 let bytes = resp.bytes().await.unwrap_or_default();
367 (status, bytes).into_response()
368 }
369 Err(e) => (
370 StatusCode::BAD_GATEWAY,
371 Json(json!({"error":{"code":"IICP-E031","message":format!("relay failed: {e}")}})),
372 )
373 .into_response(),
374 }
375}
376
377async fn admit(state: &AppState, qos: &str) -> bool {
384 let cap = state
387 .availability
388 .effective_max_concurrent(state.max_concurrent);
389 let prev = state.active_jobs.fetch_add(1, Ordering::Relaxed);
390 if prev < cap {
391 return true;
392 }
393 state.active_jobs.fetch_sub(1, Ordering::Relaxed);
394 if !crate::scheduler::is_queue_eligible(qos) {
395 return false;
396 }
397 let deadline = Instant::now() + crate::scheduler::QUEUE_WAIT;
398 while Instant::now() < deadline {
399 tokio::time::sleep(Duration::from_millis(50)).await;
400 let cap = state
401 .availability
402 .effective_max_concurrent(state.max_concurrent);
403 let prev = state.active_jobs.fetch_add(1, Ordering::Relaxed);
404 if prev < cap {
405 return true;
406 }
407 state.active_jobs.fetch_sub(1, Ordering::Relaxed);
408 }
409 false
410}
411
412async fn task_endpoint(
413 State(state): State<Arc<AppState>>,
414 headers: HeaderMap,
415 Json(mut req): Json<TaskRequest>,
416) -> Response {
417 let qos = req
419 .constraints
420 .as_ref()
421 .and_then(|c| c.get("qos_class"))
422 .and_then(|v| v.as_str())
423 .unwrap_or("best_effort")
424 .to_string();
425 if !admit(&state, &qos).await {
426 return (
427 StatusCode::TOO_MANY_REQUESTS,
428 [("Retry-After", "2"), ("Content-Type", "application/json")],
429 Json(json!({
430 "error": {
431 "code": "IICP-E021",
432 "message": "capacity_exceeded",
433 "qos_class": qos,
434 "retry_after_ms": 2000,
435 }
436 })),
437 )
438 .into_response();
439 }
440
441 if let Some(ref nonce) = req.nonce {
443 let mut cache = state.nonce_cache.lock().await;
444 cache.retain(|_, inserted_at| inserted_at.elapsed().as_secs() < NONCE_TTL_SECS);
445 if cache.contains_key(nonce) {
446 state.active_jobs.fetch_sub(1, Ordering::Relaxed);
447 return (
448 StatusCode::CONFLICT,
449 Json(json!({
450 "error": { "code": "IICP-E011", "message": "replay_detected" }
451 })),
452 )
453 .into_response();
454 }
455 cache.insert(nonce.clone(), Instant::now());
456 }
457
458 if state.enable_idempotency && !state.idempotency.check_and_register(&req.task_id) {
461 state.active_jobs.fetch_sub(1, Ordering::Relaxed);
462 return (
463 StatusCode::CONFLICT,
464 Json(json!({
465 "error": { "code": "IICP-E010", "message": "duplicate_task" }
466 })),
467 )
468 .into_response();
469 }
470
471 if let Some(tp) = headers.get("traceparent").and_then(|v| v.to_str().ok()) {
473 req._trace = Some(json!({ "traceparent": tp }));
474 }
475
476 let task_id = req.task_id.clone();
477 let result = {
482 let span = tracing::info_span!(
483 "iicp.task.execute",
484 "iicp.task_id" = %task_id,
485 "iicp.intent" = %req.intent,
486 );
487 let _guard = span.enter();
488 (state.handler)(req).await
489 };
490 state.active_jobs.fetch_sub(1, Ordering::Relaxed);
491
492 match result {
493 Ok(value) => Json(TaskResponse {
494 task_id,
495 status: "completed".into(),
496 result: Some(value),
497 error: None,
498 })
499 .into_response(),
500 Err(e) => (
501 StatusCode::INTERNAL_SERVER_ERROR,
502 Json(TaskResponse {
503 task_id,
504 status: "error".into(),
505 result: None,
506 error: Some(json!({ "message": e.to_string() })),
507 }),
508 )
509 .into_response(),
510 }
511}
512
513pub struct IicpNode {
517 cfg: NodeConfig,
518 http: Client,
519 runtime_hmac_key: std::sync::RwLock<String>,
524 runtime_token: std::sync::RwLock<String>,
526 #[allow(dead_code)]
530 pinhole_uid: std::sync::RwLock<Option<u32>>,
531 #[allow(dead_code)]
532 pinhole_lease_seconds: std::sync::RwLock<u32>,
533}
534
535impl IicpNode {
536 pub fn new(cfg: NodeConfig) -> Self {
537 let http = Client::builder()
538 .timeout(Duration::from_millis(cfg.timeout_ms + 2_000))
539 .use_rustls_tls()
540 .build()
541 .expect("failed to build HTTP client");
542 let runtime_hmac_key = std::sync::RwLock::new(cfg.node_hmac_key.clone());
543 Self {
544 cfg,
545 http,
546 runtime_hmac_key,
547 runtime_token: std::sync::RwLock::new(String::new()),
548 pinhole_uid: std::sync::RwLock::new(None),
549 pinhole_lease_seconds: std::sync::RwLock::new(3600),
550 }
551 }
552
553 pub fn node_hmac_key(&self) -> String {
556 self.runtime_hmac_key.read().expect("poisoned").clone()
557 }
558
559 pub fn cfg(&self) -> &NodeConfig {
563 &self.cfg
564 }
565
566 pub fn set_relay_worker_endpoint(&mut self, endpoint: String) {
570 self.cfg.relay_worker_endpoint = Some(endpoint);
571 }
572
573 #[cfg(feature = "nat")]
584 pub fn apply_nat_profile(&mut self, profile: &crate::nat_detection::NatProfile) {
585 if profile.is_reachable() {
586 if let Some(pub_ep) = &profile.public_endpoint {
587 self.cfg.endpoint = pub_ep.clone();
588 }
589 }
590 if let Some(tep) = &profile.transport_endpoint {
591 self.cfg.transport_endpoint = Some(tep.clone());
592 }
593 let tm = match profile.transport_method {
594 crate::nat_detection::TransportMethod::Direct => Some("direct"),
595 crate::nat_detection::TransportMethod::UpnpMapped => Some("upnp_mapped"),
596 crate::nat_detection::TransportMethod::StunHolePunch => Some("stun_hole_punch"),
597 crate::nat_detection::TransportMethod::TurnRelay => Some("turn_relay"),
598 crate::nat_detection::TransportMethod::ExternalTunnel => Some("external_tunnel"),
599 crate::nat_detection::TransportMethod::Unreachable => None,
600 };
601 if let Some(name) = tm {
602 self.cfg.transport_method = Some(name.into());
603 }
604 if self.cfg.nat_type.is_none() {
605 self.cfg.nat_type = Some("unknown".into());
606 }
607 let tail: Vec<&str> = profile
608 .detection_log
609 .iter()
610 .rev()
611 .take(1)
612 .map(|s| s.as_str())
613 .collect();
614 self.cfg.transport_metadata = Some(serde_json::json!({
615 "tier": profile.tier,
616 "detection_log_tail": tail,
617 }));
618 self.cfg.exposure_mode = Some(
621 crate::qualify::qualify_service(profile)
622 .exposure_mode
623 .to_string(),
624 );
625 if let Some(v6) = &profile.ipv6 {
627 if v6.pinhole_active {
628 if let Some(uid) = v6.pinhole_unique_id {
629 if let Ok(mut slot) = self.pinhole_uid.write() {
630 *slot = Some(uid);
631 }
632 }
633 if let Some(lease) = v6.pinhole_lease_seconds {
634 if let Ok(mut slot) = self.pinhole_lease_seconds.write() {
635 *slot = lease;
636 }
637 }
638 }
639 }
640 }
641
642 #[cfg(feature = "nat")]
644 pub async fn revoke_pinhole(&self) -> bool {
645 let uid = match self.pinhole_uid.write() {
646 Ok(mut slot) => slot.take(),
647 Err(_) => None,
648 };
649 match uid {
650 Some(uid) => crate::nat_detection::delete_ipv6_pinhole(uid).await,
651 None => false,
652 }
653 }
654
655 pub async fn deregister(&self, node_token: Option<&str>) -> Result<()> {
663 let stashed = self.runtime_token.read().expect("poisoned").clone();
664 let token = node_token.map(str::to_string).unwrap_or(stashed);
665 if token.is_empty() {
666 return Err(crate::errors::IicpError::Node(
667 "deregister() requires a node_token (none stashed — call register() first)".into(),
668 ));
669 }
670 let url = format!(
671 "{}/v1/register",
672 self.cfg.directory_url.trim_end_matches('/')
673 );
674 let resp = self
675 .http
676 .delete(&url)
677 .bearer_auth(&token)
678 .json(&serde_json::json!({"node_id": self.cfg.node_id}))
679 .send()
680 .await?;
681 let status = resp.status();
682 if !status.is_success() && status.as_u16() != 404 {
683 return Err(crate::errors::IicpError::Node(format!(
684 "Deregister failed: {status}"
685 )));
686 }
687 Ok(())
688 }
689
690 pub async fn register(&self) -> Result<String> {
697 let mut models: Vec<String> = match &self.cfg.model {
700 Some(m) => vec![m.clone()],
701 None => Vec::new(),
702 };
703 for cap in &self.cfg.capabilities {
704 if !models.contains(cap) {
705 models.push(cap.clone());
706 }
707 }
708 let region = self
709 .cfg
710 .region
711 .clone()
712 .unwrap_or_else(|| "eu-central".to_string());
713
714 let mut payload = json!({
715 "endpoint": self.cfg.endpoint,
716 "region": region,
717 "capabilities": [{
718 "intent": self.cfg.intent,
719 "models": models,
720 "max_tokens": self.cfg.max_tokens,
721 }],
722 "limits": {
723 "max_concurrent": self.cfg.max_concurrent,
724 "tokens_per_min": self.cfg.tokens_per_min,
725 },
726 });
727 if !self.cfg.node_id.is_empty() {
728 payload["node_id"] = json!(self.cfg.node_id);
729 }
730 if let Some(t) = &self.cfg.transport_endpoint {
732 payload["transport_endpoint"] = json!(t);
733 }
734 if let Some(m) = &self.cfg.transport_method {
737 payload["transport_method"] = json!(m);
738 }
739 if let Some(n) = &self.cfg.nat_type {
740 payload["nat_type"] = json!(n);
741 }
742 if let Some(md) = &self.cfg.transport_metadata {
743 payload["transport_metadata"] = md.clone();
744 }
745 if let Some(e) = &self.cfg.exposure_mode {
747 payload["exposure_mode"] = json!(e);
748 }
749
750 payload["sdk_language"] = json!("rust");
754 payload["sdk_version"] = json!(env!("CARGO_PKG_VERSION"));
755
756 let policy_arc = self
759 .cfg
760 .cip_policy
761 .clone()
762 .unwrap_or_else(crate::cip_policy::get_cip_policy);
763 if let Some(block) = policy_arc.as_register_policy_block() {
764 payload["policy"] = block;
765 }
766
767 if let Some(pricing) = &self.cfg.pricing {
769 let hmac_key = self.runtime_hmac_key.read().expect("poisoned").clone();
770 payload["pricing"] = crate::pricing::build_pricing_block(pricing, &hmac_key);
771 }
772 if !self.cfg.node_hmac_key.is_empty() {
773 payload["node_hmac_key"] = json!(self.cfg.node_hmac_key);
774 }
775
776 let resp = self
777 .http
778 .post(format!(
779 "{}/v1/register",
780 self.cfg.directory_url.trim_end_matches('/')
781 ))
782 .json(&payload)
783 .send()
784 .await
785 .map_err(|e| IicpError::Node(e.to_string()))?;
786
787 if !resp.status().is_success() {
788 return Err(IicpError::Node(format!(
789 "register failed: {}",
790 resp.status()
791 )));
792 }
793 let data: Value = resp
794 .json()
795 .await
796 .map_err(|e| IicpError::Node(e.to_string()))?;
797 let token = data["node_token"]
798 .as_str()
799 .or_else(|| data["token"].as_str())
800 .ok_or_else(|| IicpError::Node(format!("no node_token in response: {data}")))?;
801 *self.runtime_token.write().expect("poisoned") = token.to_string();
803 if self.cfg.node_hmac_key.is_empty() {
807 if let Some(dir_key) = data["node_hmac_key"].as_str() {
808 if !dir_key.is_empty() {
809 let mut guard = self.runtime_hmac_key.write().expect("poisoned");
810 *guard = dir_key.to_string();
811 }
812 }
813 }
814 Ok(token.to_string())
815 }
816
817 pub async fn heartbeat(&self, node_token: &str) -> Result<()> {
819 let resp = self
820 .http
821 .post(format!(
825 "{}/v1/heartbeat",
826 self.cfg.directory_url.trim_end_matches('/')
827 ))
828 .bearer_auth(node_token)
831 .json(&json!({
832 "node_id": self.cfg.node_id,
833 "node_token": node_token,
834 "status": "available",
835 "max_concurrent": crate::availability::AvailabilityEvaluator::new(
837 self.cfg.availability_windows.clone(),
838 )
839 .effective_max_concurrent(self.cfg.max_concurrent),
840 }))
841 .send()
842 .await
843 .map_err(|e| IicpError::Node(e.to_string()))?;
844
845 if !resp.status().is_success() {
846 return Err(IicpError::Node(format!(
847 "heartbeat failed: {}",
848 resp.status()
849 )));
850 }
851 Ok(())
852 }
853
854 pub async fn serve<F, Fut>(
859 &self,
860 handler: F,
861 addr: &str,
862 node_token: Option<String>,
863 ) -> Result<()>
864 where
865 F: Fn(TaskRequest) -> Fut + Send + Sync + 'static,
866 Fut: std::future::Future<Output = Result<Value>> + Send + 'static,
867 {
868 let handler: TaskHandlerFn = Arc::new(move |req| Box::pin(handler(req)));
869 #[cfg(feature = "iicp-tcp")]
871 let handler_for_relay = Arc::clone(&handler);
872 #[cfg(feature = "iicp-tcp")]
874 let bind_host: String = addr.split(':').next().unwrap_or("0.0.0.0").to_string();
875 let active_jobs = Arc::new(AtomicUsize::new(0));
876 let nonce_cache = Arc::new(Mutex::new(HashMap::new()));
877 let shared_pinhole_uid: Arc<std::sync::RwLock<Option<u32>>> = Arc::new(
879 std::sync::RwLock::new(self.pinhole_uid.read().ok().and_then(|g| *g)),
880 );
881 let shared_pinhole_lease: Arc<std::sync::RwLock<u32>> = Arc::new(std::sync::RwLock::new(
882 self.pinhole_lease_seconds
883 .read()
884 .map(|g| *g)
885 .unwrap_or(3600),
886 ));
887
888 let state = Arc::new(AppState {
889 handler,
890 node_id: self.cfg.node_id.clone(),
891 region: self.cfg.region.clone().unwrap_or_else(|| "unknown".into()),
892 intent: self.cfg.intent.clone(),
893 model: self.cfg.model.clone().unwrap_or_default(),
894 active_jobs,
895 max_concurrent: self.cfg.max_concurrent,
896 availability: Arc::new(crate::availability::AvailabilityEvaluator::new(
897 self.cfg.availability_windows.clone(),
898 )),
899 idempotency: Arc::new(crate::idempotency::IdempotencyGuard::default()),
900 enable_idempotency: self.cfg.enable_idempotency,
901 peer_manager: Arc::new(crate::peer_manager::PeerManager::with_opts(
902 self.cfg.directory_url.clone(),
903 self.cfg.node_hmac_key.clone(),
904 crate::peer_manager::PeerManagerOpts {
905 relay_capable: self.cfg.relay_capable,
906 relay_accept_port: self.cfg.relay_accept_port,
907 },
908 )),
909 http: self.http.clone(),
910 nonce_cache,
911 pinhole_uid: Arc::clone(&shared_pinhole_uid),
912 pinhole_lease_seconds: Arc::clone(&shared_pinhole_lease),
913 #[cfg(feature = "iicp-tcp")]
914 relay_sessions: Arc::new(crate::relay_session::RelaySessionRegistry::new()),
915 });
916
917 let hb_availability = Arc::clone(&state.availability);
920 if self.cfg.enable_mesh {
922 let pm = Arc::clone(&state.peer_manager);
923 let node_id = self.cfg.node_id.clone();
924 let own_endpoint = self.cfg.endpoint.clone();
925 tokio::spawn(async move {
926 pm.start(&node_id, &own_endpoint).await;
927 let interval = pm.gossip_interval();
928 loop {
929 tokio::time::sleep(interval).await;
930 pm.gossip_round().await;
931 }
932 });
933 }
934
935 let mut app = Router::new()
936 .route("/v1/task", post(task_endpoint))
937 .route("/iicp/health", get(health_endpoint))
938 .route("/metrics", get(metrics_endpoint));
939 if self.cfg.enable_mesh {
940 app = app.route("/v1/peers", post(peers_endpoint));
941 }
942 if self.cfg.relay_capable {
943 app = app.route("/v1/relay", post(relay_endpoint));
944 }
945 #[cfg(feature = "iicp-tcp")]
947 let relay_sessions_arc = Arc::clone(&state.relay_sessions);
948 let app = app.with_state(state);
949
950 let addr: SocketAddr = addr
951 .parse()
952 .map_err(|e| IicpError::Node(format!("invalid addr: {e}")))?;
953 let listener = TcpListener::bind(addr)
954 .await
955 .map_err(|e| IicpError::Node(e.to_string()))?;
956
957 tracing::info!("IICP node {} listening on {}", self.cfg.node_id, addr);
958
959 if let Some(token) = node_token {
960 let node_id = self.cfg.node_id.clone();
961 let dir = self.cfg.directory_url.clone();
962 let http = self.http.clone();
963 let avail = Arc::clone(&hb_availability);
964 let max_c = self.cfg.max_concurrent;
965 tokio::spawn(async move {
966 loop {
967 tokio::time::sleep(Duration::from_secs(HEARTBEAT_INTERVAL_SECS)).await;
968 if let Err(e) = http
969 .post(format!("{}/v1/heartbeat", dir.trim_end_matches('/')))
972 .bearer_auth(&token)
973 .json(&json!({
974 "node_id": &node_id,
975 "node_token": &token,
976 "status": "available",
977 "max_concurrent": avail.effective_max_concurrent(max_c),
979 }))
980 .send()
981 .await
982 {
983 tracing::warn!("heartbeat failed: {e}");
984 }
985 }
986 });
987 }
988
989 #[cfg(feature = "nat")]
991 {
992 let uid_arc = Arc::clone(&shared_pinhole_uid);
993 let lease_arc = Arc::clone(&shared_pinhole_lease);
994 tokio::spawn(async move {
995 loop {
996 let (_uid, lease) = {
997 let u = uid_arc.read().ok().and_then(|g| *g);
998 let l = lease_arc.read().map(|g| *g).unwrap_or(3600);
999 (u, l)
1000 };
1001 let delay = Duration::from_secs(u64::from((lease / 2).max(60)));
1002 tokio::time::sleep(delay).await;
1003 let uid = match uid_arc.read().ok().and_then(|g| *g) {
1004 Some(u) => u,
1005 None => return,
1006 };
1007 let ok = crate::nat_detection::renew_ipv6_pinhole(uid, lease).await;
1008 if ok {
1009 tracing::debug!("UPnP IPv6 pinhole uid={uid} renewed (lease={lease}s)");
1010 } else {
1011 tracing::warn!("UPnP IPv6 pinhole uid={uid} renewal failed — will retry");
1012 }
1013 }
1014 });
1015 }
1016
1017 #[cfg(feature = "iicp-tcp")]
1019 if self.cfg.relay_capable {
1020 let relay_reg = relay_sessions_arc;
1021 let relay_host_str = bind_host.clone();
1022 let relay_port = self.cfg.relay_accept_port;
1023 tokio::spawn(async move {
1024 let srv = Arc::new(crate::relay_session::RelayAcceptServer::new(
1025 (*relay_reg).clone(),
1026 relay_host_str,
1027 relay_port,
1028 ));
1029 if let Err(e) = srv.serve().await {
1030 tracing::warn!("Relay accept server error: {e}");
1031 }
1032 });
1033 }
1034
1035 #[cfg(feature = "iicp-tcp")]
1037 if let Some(ref ep) = self.cfg.relay_worker_endpoint {
1038 let ep = ep.clone();
1039 let node_id = self.cfg.node_id.clone();
1040 let intent = self.cfg.intent.clone();
1041 let models = self.cfg.model.clone().map(|m| vec![m]).unwrap_or_default();
1042 let handler_fn: crate::relay_worker_client::RelayHandlerFn =
1043 Arc::new(move |task: Value| {
1044 let h = Arc::clone(&handler_for_relay);
1045 Box::pin(async move {
1046 let req = crate::node::TaskRequest {
1047 task_id: task
1048 .get("task_id")
1049 .and_then(|v| v.as_str())
1050 .unwrap_or("")
1051 .to_string(),
1052 intent: task
1053 .get("intent")
1054 .and_then(|v| v.as_str())
1055 .unwrap_or("")
1056 .to_string(),
1057 payload: task.get("payload").cloned().unwrap_or(Value::Null),
1058 constraints: task.get("constraints").cloned(),
1059 auth: task.get("auth").cloned(),
1060 nonce: None,
1061 _trace: None,
1062 };
1063 h(req)
1064 .await
1065 .unwrap_or_else(|e| json!({"error": e.to_string()}))
1066 })
1067 });
1068 let (rhost, rport) = {
1069 if let Some(pos) = ep.rfind(':') {
1070 let port = ep[pos + 1..].parse::<u16>().unwrap_or(9485);
1071 (ep[..pos].to_string(), port)
1072 } else {
1073 (ep.clone(), 9485u16)
1074 }
1075 };
1076 let http_client = self.http.clone();
1079 let dir_url = self.cfg.directory_url.clone();
1080 let on_bind_cb: crate::relay_worker_client::OnBindFn = Arc::new(
1081 move |rh: String, rp: u16, _wid: String| {
1082 let http = http_client.clone();
1083 let dir = dir_url.clone();
1084 Box::pin(async move {
1085 tracing::info!(
1090 "Relay worker bound to relay {}:{} — update directory registration to use relay endpoint",
1091 rh, rp,
1092 );
1093 let _ = (http, dir); })
1095 },
1096 );
1097 tokio::spawn(async move {
1098 let rwc = Arc::new(
1099 crate::relay_worker_client::RelayWorkerClient::new(
1100 node_id, intent, rhost, rport, handler_fn, models,
1101 )
1102 .with_on_bind(on_bind_cb),
1103 );
1104 rwc.run().await;
1105 });
1106 }
1107
1108 axum::serve(listener, app)
1109 .await
1110 .map_err(|e| IicpError::Node(e.to_string()))
1111 }
1112}