Skip to main content

iicp_client/
client.rs

1// SPDX-License-Identifier: Apache-2.0
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use regex::Regex;
6
7use crate::confidentiality::encrypt_payload;
8use crate::errors::{IicpError, Result};
9use crate::http::{make_traceparent, HttpClient};
10use crate::types::*;
11
12// Compiled once at first use — avoid per-call allocation (fix: rust#3).
13static INTENT_RE: LazyLock<Regex> =
14    LazyLock::new(|| Regex::new(r"^urn:iicp:intent:[a-z0-9_:/-]+$").unwrap());
15
16const MAX_TIMEOUT_MS: u64 = 120_000;
17const MAX_RETRIES: u32 = 3;
18
19/// SSRF guard: return true only if url is safe to use as a node endpoint (#388).
20fn is_ssrf_safe(url: &str) -> bool {
21    let lower = url.to_lowercase();
22    let rest = if let Some(s) = lower.strip_prefix("https://") {
23        s
24    } else if let Some(s) = lower.strip_prefix("http://") {
25        s
26    } else {
27        return false;
28    };
29
30    // Extract host — handles IPv6 [addr]:port and plain host:port/path
31    let host = if rest.starts_with('[') {
32        rest.split(']')
33            .next()
34            .map(|s| s.trim_start_matches('['))
35            .unwrap_or("")
36    } else {
37        rest.split('/')
38            .next()
39            .unwrap_or("")
40            .split(':')
41            .next()
42            .unwrap_or("")
43    };
44
45    if host.is_empty() {
46        return false;
47    }
48    if matches!(host, "localhost" | "0.0.0.0" | "::1" | "::") {
49        return false;
50    }
51    const BLOCKED_SUFFIXES: &[&str] = &[
52        ".local",
53        ".internal",
54        ".lan",
55        ".test",
56        ".invalid",
57        ".localhost",
58    ];
59    if BLOCKED_SUFFIXES.iter().any(|s| host.ends_with(s)) {
60        return false;
61    }
62    // Bare hostname (no dot and no colon = Docker service name; IPv6 has colons)
63    if !host.contains('.') && !host.contains(':') {
64        return false;
65    }
66    if let Ok(addr) = host.parse::<std::net::IpAddr>() {
67        match addr {
68            std::net::IpAddr::V4(v4) => {
69                if v4.is_loopback()
70                    || v4.is_private()
71                    || v4.is_link_local()
72                    || v4.is_broadcast()
73                    || v4.is_unspecified()
74                {
75                    return false;
76                }
77                let o = v4.octets();
78                if o[0] == 100 && (64..=127).contains(&o[1]) {
79                    return false; // CGNAT 100.64/10
80                }
81            }
82            std::net::IpAddr::V6(v6) => {
83                if v6.is_loopback() || v6.is_multicast() || v6.is_unspecified() {
84                    return false;
85                }
86            }
87        }
88    }
89    true
90}
91
92/// Reject model/region strings containing query-separator or newline characters (#388 §FINDING-4-5).
93fn is_safe_query_param(s: &str) -> bool {
94    !s.contains(['&', '=', '\n', '\r', '\0'])
95}
96
97/// IICP client — discover → select → submit (ADR-016 §1).
98pub struct IicpClient {
99    config: ClientConfig,
100    http: HttpClient,
101}
102
103impl IicpClient {
104    /// Construct a client. Enforces SDK-04 (timeout_ms ≤ 120 000).
105    pub fn new(config: ClientConfig) -> Result<Self> {
106        if config.timeout_ms > MAX_TIMEOUT_MS {
107            return Err(IicpError::TimeoutTooLarge(config.timeout_ms));
108        }
109        let http = HttpClient::new(config.timeout_ms, config.node_token.clone())?;
110        Ok(Self { config, http })
111    }
112
113    /// Discover nodes for *intent* (SDK-01). Accepts an optional traceparent for propagation.
114    pub async fn discover(
115        &self,
116        intent: &str,
117        opts: Option<DiscoverOptions>,
118        traceparent: Option<&str>,
119    ) -> Result<NodeList> {
120        self.validate_intent(intent)?;
121        let opts = opts.unwrap_or_default();
122        let base = self.config.directory_url.trim_end_matches('/');
123        let mut url = format!("{base}/v1/discover?intent={intent}");
124        if let Some(region) = opts.region.as_ref().or(self.config.region.as_ref()) {
125            if is_safe_query_param(region) {
126                url.push_str(&format!("&region={region}"));
127            }
128        }
129        if let Some(model) = &opts.model {
130            if is_safe_query_param(model) {
131                url.push_str(&format!("&model={model}"));
132            }
133        }
134        if let Some(rep) = opts.min_reputation {
135            url.push_str(&format!("&min_reputation={rep}"));
136        }
137        url.push_str(&format!("&limit={}", opts.limit.unwrap_or(10)));
138        self.http.get_json(&url, traceparent).await
139    }
140
141    /// Discover → select best node → submit task (SDK-01/02).
142    /// Retries up to MAX_RETRIES on transient errors (SDK-05).
143    /// Generates one W3C traceparent shared across discover + POST (SDK-06).
144    pub async fn submit(&self, mut request: TaskRequest) -> Result<TaskResponse> {
145        self.validate_intent(&request.intent)?;
146        if request.task_id.is_empty() {
147            request.task_id = uuid::Uuid::new_v4().to_string();
148        }
149
150        let tp = make_traceparent(); // SDK-06: shared across discover + node POST
151        let nodes = self.discover(&request.intent, None, Some(&tp)).await?;
152        // Collect up to MAX_RETRIES candidates — fall back to the next on connection errors.
153        let candidates: Vec<_> = nodes
154            .nodes
155            .into_iter()
156            .filter(|n| {
157                if !is_ssrf_safe(&n.endpoint) {
158                    eprintln!(
159                        "[iicp-client] SSRF guard: skipping node {} — endpoint {} is not publicly routable",
160                        &n.node_id[..n.node_id.len().min(8)],
161                        n.endpoint
162                    );
163                    false
164                } else {
165                    true
166                }
167            })
168            .filter(|n| n.available)
169            .take(MAX_RETRIES as usize)
170            .collect();
171
172        if candidates.is_empty() {
173            return Err(IicpError::NoNodes {
174                intent: request.intent.clone(),
175            });
176        }
177
178        let mut last_err: Option<IicpError> = None;
179
180        'nodes: for node in &candidates {
181            // IICP-CX S.16 §5: build body per node (cx_public_key may differ per node)
182            let body: serde_json::Value = if self.config.use_confidentiality {
183                if let Some(ref cx_key) = node.cx_public_key {
184                    let iicp_conf = encrypt_payload(
185                        &request.payload,
186                        cx_key,
187                        &request.task_id,
188                        &request.intent,
189                    )?;
190                    let mut body = serde_json::to_value(&request)?;
191                    if let Some(obj) = body.as_object_mut() {
192                        obj.remove("payload");
193                        obj.insert("iicp_conf".to_string(), serde_json::to_value(iicp_conf)?);
194                    }
195                    body
196                } else {
197                    serde_json::to_value(&request)?
198                }
199            } else {
200                serde_json::to_value(&request)?
201            };
202
203            for attempt in 0..MAX_RETRIES {
204                match self
205                    .http
206                    .post_json(
207                        &format!("{}/v1/task", node.endpoint),
208                        &body,
209                        None,
210                        Some(&tp),
211                    )
212                    .await
213                {
214                    Ok(resp) => return Ok(resp),
215                    Err(e) => {
216                        last_err = Some(e);
217                        let err = last_err.as_ref().unwrap();
218                        if !err.is_transient() {
219                            return Err(last_err.unwrap()); // hard failure, don't retry
220                        }
221                        // Network/connection error → try next node immediately
222                        if matches!(err, IicpError::Http(_)) {
223                            continue 'nodes;
224                        }
225                        // Server 5xx → retry same node with backoff
226                        if attempt < MAX_RETRIES - 1 {
227                            tokio::time::sleep(Duration::from_millis(200 * 2u64.pow(attempt)))
228                                .await;
229                        }
230                    }
231                }
232            }
233        }
234
235        Err(last_err.unwrap_or_else(|| IicpError::NoNodes {
236            intent: request.intent.clone(),
237        }))
238    }
239
240    /// Discover → select best LLM node → submit chat task (SDK-02).
241    pub async fn chat(
242        &self,
243        messages: Vec<ChatMessage>,
244        opts: Option<ChatOptions>,
245    ) -> Result<ChatResponse> {
246        let opts = opts.unwrap_or_default();
247        let mut payload = serde_json::json!({ "messages": messages });
248        if let Some(ref model) = opts.model {
249            payload["model"] = serde_json::Value::String(model.clone());
250        }
251        if let Some(temp) = opts.temperature {
252            payload["temperature"] = serde_json::json!(temp);
253        }
254        let request = TaskRequest {
255            task_id: uuid::Uuid::new_v4().to_string(),
256            intent: "urn:iicp:intent:llm:chat:v1".into(),
257            payload,
258            constraints: Some(TaskConstraints {
259                timeout_ms: opts.timeout_ms,
260                max_tokens: opts.max_tokens,
261                model: opts.model,
262            }),
263            auth: None,
264        };
265        let task_resp = self.submit(request).await?;
266        let node_id = task_resp.metrics.as_ref().and_then(|m| m.node_id.clone());
267        let task_id = task_resp.task_id.clone();
268        let result = task_resp.result.ok_or_else(|| IicpError::Protocol {
269            code: "no_result".into(),
270            message: "Node returned task without a result payload".into(),
271            status: 200,
272        })?;
273        let mut resp: ChatResponse = serde_json::from_value(result)?;
274        resp.task_id = task_id;
275        resp.node_id = node_id;
276        Ok(resp)
277    }
278
279    fn validate_intent(&self, intent: &str) -> Result<()> {
280        if !INTENT_RE.is_match(intent) {
281            return Err(IicpError::InvalidIntent(intent.into()));
282        }
283        Ok(())
284    }
285}