Skip to main content

canic_host/
replica_query.rs

1use crate::icp_config::{
2    DEFAULT_LOCAL_GATEWAY_PORT, configured_local_gateway_port,
3    configured_local_gateway_port_from_root,
4};
5use candid::{CandidType, Decode, Encode, Principal};
6use serde::{Deserialize, Serialize};
7use std::{
8    error::Error,
9    fmt,
10    io::{Read, Write},
11    net::TcpStream,
12    path::Path,
13    time::{SystemTime, UNIX_EPOCH},
14};
15
16///
17/// ReplicaQueryError
18///
19
20#[derive(Debug)]
21pub enum ReplicaQueryError {
22    Io(std::io::Error),
23    Cbor(serde_cbor::Error),
24    Json(serde_json::Error),
25    Query(String),
26    Rejected { code: u64, message: String },
27}
28
29impl fmt::Display for ReplicaQueryError {
30    // Render local replica query failures as compact operator diagnostics.
31    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
32        match self {
33            Self::Io(err) => write!(formatter, "{err}"),
34            Self::Cbor(err) => write!(formatter, "{err}"),
35            Self::Json(err) => write!(formatter, "{err}"),
36            Self::Query(message) => write!(formatter, "{message}"),
37            Self::Rejected { code, message } => {
38                write!(
39                    formatter,
40                    "local replica rejected query: code={code} message={message}"
41                )
42            }
43        }
44    }
45}
46
47impl Error for ReplicaQueryError {
48    // Preserve structured source errors for I/O and serialization failures.
49    fn source(&self) -> Option<&(dyn Error + 'static)> {
50        match self {
51            Self::Io(err) => Some(err),
52            Self::Cbor(err) => Some(err),
53            Self::Json(err) => Some(err),
54            Self::Query(_) | Self::Rejected { .. } => None,
55        }
56    }
57}
58
59impl From<std::io::Error> for ReplicaQueryError {
60    // Convert local socket and process I/O failures.
61    fn from(err: std::io::Error) -> Self {
62        Self::Io(err)
63    }
64}
65
66impl From<serde_cbor::Error> for ReplicaQueryError {
67    // Convert CBOR encode/decode failures.
68    fn from(err: serde_cbor::Error) -> Self {
69        Self::Cbor(err)
70    }
71}
72
73impl From<serde_json::Error> for ReplicaQueryError {
74    // Convert JSON rendering failures.
75    fn from(err: serde_json::Error) -> Self {
76        Self::Json(err)
77    }
78}
79
80/// Return whether the selected network should use direct local replica queries.
81#[must_use]
82pub fn should_use_local_replica_query(network: Option<&str>) -> bool {
83    network.is_none_or(|network| network == "local" || network.starts_with("http://"))
84}
85
86/// Query `canic_ready` directly through the local replica HTTP API.
87pub fn query_ready(network: Option<&str>, canister: &str) -> Result<bool, ReplicaQueryError> {
88    let bytes = local_query(network, canister, "canic_ready")?;
89    Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
90}
91
92/// Query `canic_ready` using the configured port from one ICP root.
93pub fn query_ready_from_root(
94    network: Option<&str>,
95    canister: &str,
96    icp_root: &Path,
97) -> Result<bool, ReplicaQueryError> {
98    let bytes = local_query_from_root(network, canister, "canic_ready", icp_root)?;
99    Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
100}
101
102/// Return true when the local replica HTTP status endpoint is reachable.
103#[must_use]
104pub fn local_replica_status_reachable_from_root(network: Option<&str>, icp_root: &Path) -> bool {
105    get_http_status(&local_replica_endpoint_from_root(network, icp_root)).is_ok()
106}
107
108/// Return the HTTP endpoint Canic should use for a local replica under one ICP root.
109#[must_use]
110pub fn local_replica_endpoint_from_root(network: Option<&str>, icp_root: &Path) -> String {
111    local_replica_endpoint_with_port(
112        network,
113        configured_local_gateway_port_from_root(icp_root).ok(),
114    )
115}
116
117/// Return the replica root key advertised by the local status endpoint.
118pub fn local_replica_root_key_from_root(
119    network: Option<&str>,
120    icp_root: &Path,
121) -> Result<Option<String>, ReplicaQueryError> {
122    let endpoint = local_replica_endpoint_from_root(network, icp_root);
123    let body = get_http_status(&endpoint)?;
124    let value = serde_json::from_slice::<serde_json::Value>(&body)?;
125    Ok(value
126        .get("root_key")
127        .and_then(serde_json::Value::as_str)
128        .filter(|root_key| !root_key.is_empty())
129        .map(str::to_string))
130}
131
132/// Parse common JSON shapes returned by command-line calls for `canic_ready`.
133#[must_use]
134pub fn parse_ready_json_value(data: &serde_json::Value) -> bool {
135    match data {
136        serde_json::Value::Bool(value) => *value,
137        serde_json::Value::String(value) => value.trim() == "(true)",
138        serde_json::Value::Array(values) => values.iter().any(parse_ready_json_value),
139        serde_json::Value::Object(map) => map.values().any(parse_ready_json_value),
140        _ => false,
141    }
142}
143
144/// Query `canic_subnet_registry` and render JSON in the CLI response shape.
145pub fn query_subnet_registry_json(
146    network: Option<&str>,
147    root: &str,
148) -> Result<String, ReplicaQueryError> {
149    let bytes = local_query(network, root, "canic_subnet_registry")?;
150    let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
151        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
152    let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
153    serde_json::to_string(&response.to_cli_json()).map_err(ReplicaQueryError::from)
154}
155
156/// Query `canic_subnet_registry` using the configured port from one ICP root.
157pub fn query_subnet_registry_json_from_root(
158    network: Option<&str>,
159    root: &str,
160    icp_root: &Path,
161) -> Result<String, ReplicaQueryError> {
162    let bytes = local_query_from_root(network, root, "canic_subnet_registry", icp_root)?;
163    let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
164        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
165    let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
166    serde_json::to_string(&response.to_cli_json()).map_err(ReplicaQueryError::from)
167}
168
169// Execute one anonymous query call against the local replica.
170fn local_query(
171    network: Option<&str>,
172    canister: &str,
173    method: &str,
174) -> Result<Vec<u8>, ReplicaQueryError> {
175    local_query_with_endpoint(network, canister, method, local_replica_endpoint(network))
176}
177
178fn local_query_from_root(
179    network: Option<&str>,
180    canister: &str,
181    method: &str,
182    icp_root: &Path,
183) -> Result<Vec<u8>, ReplicaQueryError> {
184    local_query_with_endpoint(
185        network,
186        canister,
187        method,
188        local_replica_endpoint_from_root(network, icp_root),
189    )
190}
191
192fn local_query_with_endpoint(
193    _network: Option<&str>,
194    canister: &str,
195    method: &str,
196    endpoint: String,
197) -> Result<Vec<u8>, ReplicaQueryError> {
198    let canister_id =
199        Principal::from_text(canister).map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
200    let arg = Encode!().map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
201    let sender = Principal::anonymous();
202    let envelope = QueryEnvelope {
203        content: QueryContent {
204            request_type: "query",
205            canister_id: canister_id.as_slice(),
206            method_name: method,
207            arg: &arg,
208            sender: sender.as_slice(),
209            ingress_expiry: ingress_expiry_nanos()?,
210        },
211    };
212    let body = serde_cbor::to_vec(&envelope)?;
213    let response = post_cbor(
214        &endpoint,
215        &format!("/api/v2/canister/{canister}/query"),
216        &body,
217    )?;
218    let query_response = serde_cbor::from_slice::<QueryResponse>(&response)?;
219
220    if query_response.status == "replied" {
221        return query_response
222            .reply
223            .map(|reply| reply.arg)
224            .ok_or_else(|| ReplicaQueryError::Query("missing query reply".to_string()));
225    }
226
227    Err(ReplicaQueryError::Rejected {
228        code: query_response.reject_code.unwrap_or_default(),
229        message: query_response.reject_message.unwrap_or_default(),
230    })
231}
232
233// Resolve the local replica endpoint from explicit URL or the configured ICP CLI local port.
234fn local_replica_endpoint(network: Option<&str>) -> String {
235    local_replica_endpoint_with_port(network, configured_local_gateway_port().ok())
236}
237
238// Format the local replica endpoint from an explicit URL, configured port, or ICP default.
239fn local_replica_endpoint_with_port(network: Option<&str>, configured_port: Option<u16>) -> String {
240    if let Some(network) = network.filter(|network| network.starts_with("http://")) {
241        return network.trim_end_matches('/').to_string();
242    }
243
244    let port = configured_port.unwrap_or(DEFAULT_LOCAL_GATEWAY_PORT);
245    format!("http://127.0.0.1:{port}")
246}
247
248// Return an ingress expiry comfortably in the near future for local queries.
249fn ingress_expiry_nanos() -> Result<u64, ReplicaQueryError> {
250    let now = SystemTime::now()
251        .duration_since(UNIX_EPOCH)
252        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
253    let expiry = now
254        .as_nanos()
255        .saturating_add(5 * 60 * 1_000_000_000)
256        .min(u128::from(u64::MAX));
257    u64::try_from(expiry).map_err(|err| ReplicaQueryError::Query(err.to_string()))
258}
259
260// POST one CBOR request over simple HTTP/1.1 and return the response body.
261fn post_cbor(endpoint: &str, path: &str, body: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
262    let (host, port) = parse_http_endpoint(endpoint)?;
263    let mut stream = TcpStream::connect((host.as_str(), port))?;
264    let request = format!(
265        "POST {path} HTTP/1.1\r\nHost: {host}:{port}\r\nContent-Type: application/cbor\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
266        body.len()
267    );
268    stream.write_all(request.as_bytes())?;
269    stream.write_all(body)?;
270
271    let mut response = Vec::new();
272    stream.read_to_end(&mut response)?;
273    split_http_body(&response)
274}
275
276fn get_http_status(endpoint: &str) -> Result<Vec<u8>, ReplicaQueryError> {
277    let (host, port) = parse_http_endpoint(endpoint)?;
278    let mut stream = TcpStream::connect((host.as_str(), port))?;
279    let request =
280        format!("GET /api/v2/status HTTP/1.1\r\nHost: {host}:{port}\r\nConnection: close\r\n\r\n");
281    stream.write_all(request.as_bytes())?;
282
283    let mut response = Vec::new();
284    stream.read_to_end(&mut response)?;
285    split_http_body(&response)
286}
287
288// Parse the limited HTTP endpoints supported by local direct queries.
289fn parse_http_endpoint(endpoint: &str) -> Result<(String, u16), ReplicaQueryError> {
290    let rest = endpoint
291        .strip_prefix("http://")
292        .ok_or_else(|| ReplicaQueryError::Query(format!("unsupported endpoint {endpoint}")))?;
293    let authority = rest.split('/').next().unwrap_or(rest);
294    let (host, port) = authority
295        .rsplit_once(':')
296        .ok_or_else(|| ReplicaQueryError::Query(format!("missing port in {endpoint}")))?;
297    let port = port
298        .parse::<u16>()
299        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
300    Ok((host.to_string(), port))
301}
302
303// Split a simple HTTP response and reject non-2xx status codes.
304fn split_http_body(response: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
305    let marker = b"\r\n\r\n";
306    let Some(index) = response
307        .windows(marker.len())
308        .position(|window| window == marker)
309    else {
310        return Err(ReplicaQueryError::Query(
311            "malformed HTTP response".to_string(),
312        ));
313    };
314    let header = String::from_utf8_lossy(&response[..index]);
315    let status_ok = header
316        .lines()
317        .next()
318        .is_some_and(|status| status.contains(" 2"));
319    if !status_ok {
320        return Err(ReplicaQueryError::Query(header.to_string()));
321    }
322    Ok(response[index + marker.len()..].to_vec())
323}
324
325///
326/// QueryEnvelope
327///
328
329#[derive(Serialize)]
330struct QueryEnvelope<'a> {
331    content: QueryContent<'a>,
332}
333
334///
335/// QueryContent
336///
337
338#[derive(Serialize)]
339struct QueryContent<'a> {
340    request_type: &'static str,
341    #[serde(with = "serde_bytes")]
342    canister_id: &'a [u8],
343    method_name: &'a str,
344    #[serde(with = "serde_bytes")]
345    arg: &'a [u8],
346    #[serde(with = "serde_bytes")]
347    sender: &'a [u8],
348    ingress_expiry: u64,
349}
350
351///
352/// QueryResponse
353///
354
355#[derive(Deserialize)]
356struct QueryResponse {
357    status: String,
358    reply: Option<QueryReply>,
359    reject_code: Option<u64>,
360    reject_message: Option<String>,
361}
362
363///
364/// QueryReply
365///
366
367#[derive(Deserialize)]
368struct QueryReply {
369    #[serde(with = "serde_bytes")]
370    arg: Vec<u8>,
371}
372
373///
374/// SubnetRegistryResponseWire
375///
376
377#[derive(CandidType, Deserialize)]
378struct SubnetRegistryResponseWire(Vec<SubnetRegistryEntryWire>);
379
380impl SubnetRegistryResponseWire {
381    // Convert direct Candid query output into the command JSON shape the discovery parser accepts.
382    fn to_cli_json(&self) -> serde_json::Value {
383        serde_json::json!({
384            "Ok": self.0.iter().map(SubnetRegistryEntryWire::to_cli_json).collect::<Vec<_>>()
385        })
386    }
387}
388
389///
390/// SubnetRegistryEntryWire
391///
392
393#[derive(CandidType, Deserialize)]
394struct SubnetRegistryEntryWire {
395    pid: Principal,
396    role: String,
397    record: CanisterInfoWire,
398}
399
400impl SubnetRegistryEntryWire {
401    // Convert one registry entry into the command JSON shape used by existing list rendering.
402    fn to_cli_json(&self) -> serde_json::Value {
403        serde_json::json!({
404            "pid": self.pid.to_text(),
405            "role": self.role,
406            "record": self.record.to_cli_json(),
407        })
408    }
409}
410
411///
412/// CanisterInfoWire
413///
414
415#[derive(CandidType, Deserialize)]
416struct CanisterInfoWire {
417    pid: Principal,
418    role: String,
419    parent_pid: Option<Principal>,
420    module_hash: Option<Vec<u8>>,
421    created_at: u64,
422}
423
424impl CanisterInfoWire {
425    // Convert one canister info record into a CLI-like JSON object.
426    fn to_cli_json(&self) -> serde_json::Value {
427        serde_json::json!({
428            "pid": self.pid.to_text(),
429            "role": self.role,
430            "parent_pid": self.parent_pid.as_ref().map(Principal::to_text),
431            "module_hash": self.module_hash,
432            "created_at": self.created_at.to_string(),
433        })
434    }
435}
436
437///
438/// CanicErrorWire
439///
440
441#[derive(CandidType, Deserialize)]
442struct CanicErrorWire {
443    code: ErrorCodeWire,
444    message: String,
445}
446
447impl fmt::Display for CanicErrorWire {
448    // Render a compact public API error from a direct local replica query.
449    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
450        write!(formatter, "{:?}: {}", self.code, self.message)
451    }
452}
453
454///
455/// ErrorCodeWire
456///
457
458#[derive(CandidType, Debug, Deserialize)]
459enum ErrorCodeWire {
460    Conflict,
461    Forbidden,
462    Internal,
463    InvalidInput,
464    InvariantViolation,
465    NotFound,
466    PolicyInstanceRequiresSingletonWithDirectory,
467    PolicyReplicaRequiresSingletonWithScaling,
468    PolicyRoleAlreadyRegistered,
469    PolicyShardRequiresSingletonWithSharding,
470    PolicySingletonAlreadyRegisteredUnderParent,
471    ResourceExhausted,
472    Unauthorized,
473    Unavailable,
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479
480    // Ensure readiness parsing accepts common command-line JSON result shapes.
481    #[test]
482    fn parse_ready_json_value_accepts_nested_true_shapes() {
483        assert!(parse_ready_json_value(&serde_json::json!(true)));
484        assert!(parse_ready_json_value(&serde_json::json!({ "Ok": true })));
485        assert!(parse_ready_json_value(&serde_json::json!([{ "Ok": true }])));
486        assert!(parse_ready_json_value(&serde_json::json!({
487            "response_candid": "(true)"
488        })));
489    }
490
491    // Ensure readiness parsing rejects false and non-boolean result shapes.
492    #[test]
493    fn parse_ready_json_value_rejects_false_shapes() {
494        assert!(!parse_ready_json_value(&serde_json::json!(false)));
495        assert!(!parse_ready_json_value(&serde_json::json!({ "Ok": false })));
496        assert!(!parse_ready_json_value(&serde_json::json!("true")));
497    }
498
499    // Ensure direct local queries use the ICP CLI local endpoint fallback when no project port is configured.
500    #[test]
501    fn local_replica_endpoint_defaults_to_icp_cli_port() {
502        assert_eq!(
503            local_replica_endpoint_with_port(None, None),
504            "http://127.0.0.1:8000"
505        );
506        assert_eq!(
507            local_replica_endpoint_with_port(None, Some(8001)),
508            "http://127.0.0.1:8001"
509        );
510        assert_eq!(
511            local_replica_endpoint_with_port(Some("http://127.0.0.1:9000/"), Some(8001)),
512            "http://127.0.0.1:9000"
513        );
514    }
515}