Skip to main content

canic_host/
replica_query.rs

1use crate::icp_config::{
2    DEFAULT_LOCAL_GATEWAY_PORT, configured_local_gateway_port,
3    configured_local_gateway_port_from_root,
4};
5use candid::{CandidType, Decode, Encode, Principal};
6use serde::{Deserialize, Serialize};
7use std::{
8    error::Error,
9    fmt,
10    io::{Read, Write},
11    net::TcpStream,
12    path::Path,
13    time::{SystemTime, UNIX_EPOCH},
14};
15
16///
17/// ReplicaQueryError
18///
19
20#[derive(Debug)]
21pub enum ReplicaQueryError {
22    Io(std::io::Error),
23    Cbor(serde_cbor::Error),
24    Json(serde_json::Error),
25    Query(String),
26    Rejected { code: u64, message: String },
27}
28
29impl fmt::Display for ReplicaQueryError {
30    // Render local replica query failures as compact operator diagnostics.
31    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
32        match self {
33            Self::Io(err) => write!(formatter, "{err}"),
34            Self::Cbor(err) => write!(formatter, "{err}"),
35            Self::Json(err) => write!(formatter, "{err}"),
36            Self::Query(message) => write!(formatter, "{message}"),
37            Self::Rejected { code, message } => {
38                write!(
39                    formatter,
40                    "local replica rejected query: code={code} message={message}"
41                )
42            }
43        }
44    }
45}
46
47impl Error for ReplicaQueryError {
48    // Preserve structured source errors for I/O and serialization failures.
49    fn source(&self) -> Option<&(dyn Error + 'static)> {
50        match self {
51            Self::Io(err) => Some(err),
52            Self::Cbor(err) => Some(err),
53            Self::Json(err) => Some(err),
54            Self::Query(_) | Self::Rejected { .. } => None,
55        }
56    }
57}
58
59impl From<std::io::Error> for ReplicaQueryError {
60    // Convert local socket and process I/O failures.
61    fn from(err: std::io::Error) -> Self {
62        Self::Io(err)
63    }
64}
65
66impl From<serde_cbor::Error> for ReplicaQueryError {
67    // Convert CBOR encode/decode failures.
68    fn from(err: serde_cbor::Error) -> Self {
69        Self::Cbor(err)
70    }
71}
72
73impl From<serde_json::Error> for ReplicaQueryError {
74    // Convert JSON rendering failures.
75    fn from(err: serde_json::Error) -> Self {
76        Self::Json(err)
77    }
78}
79
80/// Return whether the selected network should use direct local replica queries.
81#[must_use]
82pub fn should_use_local_replica_query(network: Option<&str>) -> bool {
83    network.is_none_or(|network| network == "local" || network.starts_with("http://"))
84}
85
86/// Query `canic_ready` directly through the local replica HTTP API.
87pub fn query_ready(network: Option<&str>, canister: &str) -> Result<bool, ReplicaQueryError> {
88    let bytes = local_query(network, canister, "canic_ready")?;
89    Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
90}
91
92/// Query `canic_ready` using the configured port from one ICP root.
93pub fn query_ready_from_root(
94    network: Option<&str>,
95    canister: &str,
96    icp_root: &Path,
97) -> Result<bool, ReplicaQueryError> {
98    let bytes = local_query_from_root(network, canister, "canic_ready", icp_root)?;
99    Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
100}
101
102/// Return true when the local replica HTTP status endpoint is reachable.
103#[must_use]
104pub fn local_replica_status_reachable_from_root(network: Option<&str>, icp_root: &Path) -> bool {
105    get_http_status(&local_replica_endpoint_from_root(network, icp_root)).is_ok()
106}
107
108/// Return the HTTP endpoint Canic should use for a local replica under one ICP root.
109#[must_use]
110pub fn local_replica_endpoint_from_root(network: Option<&str>, icp_root: &Path) -> String {
111    local_replica_endpoint_with_port(
112        network,
113        configured_local_gateway_port_from_root(icp_root).ok(),
114    )
115}
116
117/// Return the replica root key advertised by the local status endpoint.
118pub fn local_replica_root_key_from_root(
119    network: Option<&str>,
120    icp_root: &Path,
121) -> Result<Option<String>, ReplicaQueryError> {
122    let endpoint = local_replica_endpoint_from_root(network, icp_root);
123    let body = get_http_status(&endpoint)?;
124    Ok(parse_local_replica_root_key(&body))
125}
126
127fn parse_local_replica_root_key(body: &[u8]) -> Option<String> {
128    serde_json::from_slice::<serde_json::Value>(body)
129        .ok()
130        .and_then(|value| root_key_from_json(&value))
131        .or_else(|| {
132            serde_cbor::from_slice::<serde_cbor::Value>(body)
133                .ok()
134                .and_then(|value| root_key_from_cbor(&value))
135        })
136}
137
138fn root_key_from_json(value: &serde_json::Value) -> Option<String> {
139    match value {
140        serde_json::Value::String(text) => nonempty_text(text),
141        serde_json::Value::Array(values) => values.iter().find_map(root_key_from_json),
142        serde_json::Value::Object(map) => map
143            .get("root_key")
144            .and_then(root_key_from_json)
145            .or_else(|| map.values().find_map(root_key_from_json)),
146        _ => None,
147    }
148}
149
150fn root_key_from_cbor(value: &serde_cbor::Value) -> Option<String> {
151    match value {
152        serde_cbor::Value::Bytes(bytes) => (!bytes.is_empty()).then(|| hex_bytes(bytes)),
153        serde_cbor::Value::Text(text) => nonempty_text(text),
154        serde_cbor::Value::Array(values) => values.iter().find_map(root_key_from_cbor),
155        serde_cbor::Value::Map(map) => map
156            .iter()
157            .find_map(|(key, value)| match key {
158                serde_cbor::Value::Text(key) if key == "root_key" => root_key_from_cbor(value),
159                _ => None,
160            })
161            .or_else(|| map.values().find_map(root_key_from_cbor)),
162        _ => None,
163    }
164}
165
166fn nonempty_text(text: &str) -> Option<String> {
167    let trimmed = text.trim();
168    (!trimmed.is_empty()).then(|| trimmed.to_string())
169}
170
171fn hex_bytes(bytes: &[u8]) -> String {
172    let mut encoded = String::with_capacity(bytes.len() * 2);
173    for byte in bytes {
174        use std::fmt::Write as _;
175        let _ = write!(encoded, "{byte:02x}");
176    }
177    encoded
178}
179
180/// Parse common JSON shapes returned by command-line calls for `canic_ready`.
181#[must_use]
182pub fn parse_ready_json_value(data: &serde_json::Value) -> bool {
183    match data {
184        serde_json::Value::Bool(value) => *value,
185        serde_json::Value::String(value) => value.trim() == "(true)",
186        serde_json::Value::Array(values) => values.iter().any(parse_ready_json_value),
187        serde_json::Value::Object(map) => map.values().any(parse_ready_json_value),
188        _ => false,
189    }
190}
191
192/// Query `canic_subnet_registry` and render JSON in the CLI response shape.
193pub fn query_subnet_registry_json(
194    network: Option<&str>,
195    root: &str,
196) -> Result<String, ReplicaQueryError> {
197    let bytes = local_query(network, root, "canic_subnet_registry")?;
198    let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
199        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
200    let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
201    serde_json::to_string(&response.to_cli_json()).map_err(ReplicaQueryError::from)
202}
203
204/// Query `canic_subnet_registry` using the configured port from one ICP root.
205pub fn query_subnet_registry_json_from_root(
206    network: Option<&str>,
207    root: &str,
208    icp_root: &Path,
209) -> Result<String, ReplicaQueryError> {
210    let bytes = local_query_from_root(network, root, "canic_subnet_registry", icp_root)?;
211    let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
212        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
213    let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
214    serde_json::to_string(&response.to_cli_json()).map_err(ReplicaQueryError::from)
215}
216
217// Execute one anonymous query call against the local replica.
218fn local_query(
219    network: Option<&str>,
220    canister: &str,
221    method: &str,
222) -> Result<Vec<u8>, ReplicaQueryError> {
223    local_query_with_endpoint(canister, method, local_replica_endpoint(network))
224}
225
226fn local_query_from_root(
227    network: Option<&str>,
228    canister: &str,
229    method: &str,
230    icp_root: &Path,
231) -> Result<Vec<u8>, ReplicaQueryError> {
232    local_query_with_endpoint(
233        canister,
234        method,
235        local_replica_endpoint_from_root(network, icp_root),
236    )
237}
238
239fn local_query_with_endpoint(
240    canister: &str,
241    method: &str,
242    endpoint: String,
243) -> Result<Vec<u8>, ReplicaQueryError> {
244    let canister_id =
245        Principal::from_text(canister).map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
246    let arg = Encode!().map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
247    let sender = Principal::anonymous();
248    let envelope = QueryEnvelope {
249        content: QueryContent {
250            request_type: "query",
251            canister_id: canister_id.as_slice(),
252            method_name: method,
253            arg: &arg,
254            sender: sender.as_slice(),
255            ingress_expiry: ingress_expiry_nanos()?,
256        },
257    };
258    let body = serde_cbor::to_vec(&envelope)?;
259    let response = post_cbor(
260        &endpoint,
261        &format!("/api/v2/canister/{canister}/query"),
262        &body,
263    )?;
264    let query_response = serde_cbor::from_slice::<QueryResponse>(&response)?;
265
266    if query_response.status == "replied" {
267        return query_response
268            .reply
269            .map(|reply| reply.arg)
270            .ok_or_else(|| ReplicaQueryError::Query("missing query reply".to_string()));
271    }
272
273    Err(ReplicaQueryError::Rejected {
274        code: query_response.reject_code.unwrap_or_default(),
275        message: query_response.reject_message.unwrap_or_default(),
276    })
277}
278
279// Resolve the local replica endpoint from explicit URL or the configured ICP CLI local port.
280fn local_replica_endpoint(network: Option<&str>) -> String {
281    local_replica_endpoint_with_port(network, configured_local_gateway_port().ok())
282}
283
284// Format the local replica endpoint from an explicit URL, configured port, or ICP default.
285fn local_replica_endpoint_with_port(network: Option<&str>, configured_port: Option<u16>) -> String {
286    if let Some(network) = network.filter(|network| network.starts_with("http://")) {
287        return network.trim_end_matches('/').to_string();
288    }
289
290    let port = configured_port.unwrap_or(DEFAULT_LOCAL_GATEWAY_PORT);
291    format!("http://127.0.0.1:{port}")
292}
293
294// Return an ingress expiry comfortably in the near future for local queries.
295fn ingress_expiry_nanos() -> Result<u64, ReplicaQueryError> {
296    let now = SystemTime::now()
297        .duration_since(UNIX_EPOCH)
298        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
299    let expiry = now
300        .as_nanos()
301        .saturating_add(5 * 60 * 1_000_000_000)
302        .min(u128::from(u64::MAX));
303    u64::try_from(expiry).map_err(|err| ReplicaQueryError::Query(err.to_string()))
304}
305
306// POST one CBOR request over simple HTTP/1.1 and return the response body.
307fn post_cbor(endpoint: &str, path: &str, body: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
308    let (host, port) = parse_http_endpoint(endpoint)?;
309    let mut stream = TcpStream::connect((host.as_str(), port))?;
310    let request = format!(
311        "POST {path} HTTP/1.1\r\nHost: {host}:{port}\r\nContent-Type: application/cbor\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
312        body.len()
313    );
314    stream.write_all(request.as_bytes())?;
315    stream.write_all(body)?;
316
317    let mut response = Vec::new();
318    stream.read_to_end(&mut response)?;
319    split_http_body(&response)
320}
321
322fn get_http_status(endpoint: &str) -> Result<Vec<u8>, ReplicaQueryError> {
323    let (host, port) = parse_http_endpoint(endpoint)?;
324    let mut stream = TcpStream::connect((host.as_str(), port))?;
325    let request =
326        format!("GET /api/v2/status HTTP/1.1\r\nHost: {host}:{port}\r\nConnection: close\r\n\r\n");
327    stream.write_all(request.as_bytes())?;
328
329    let mut response = Vec::new();
330    stream.read_to_end(&mut response)?;
331    split_http_body(&response)
332}
333
334// Parse the limited HTTP endpoints supported by local direct queries.
335fn parse_http_endpoint(endpoint: &str) -> Result<(String, u16), ReplicaQueryError> {
336    let rest = endpoint
337        .strip_prefix("http://")
338        .ok_or_else(|| ReplicaQueryError::Query(format!("unsupported endpoint {endpoint}")))?;
339    let authority = rest.split('/').next().unwrap_or(rest);
340    let (host, port) = authority
341        .rsplit_once(':')
342        .ok_or_else(|| ReplicaQueryError::Query(format!("missing port in {endpoint}")))?;
343    let port = port
344        .parse::<u16>()
345        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
346    Ok((host.to_string(), port))
347}
348
349// Split a simple HTTP response and reject non-2xx status codes.
350fn split_http_body(response: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
351    let marker = b"\r\n\r\n";
352    let Some(index) = response
353        .windows(marker.len())
354        .position(|window| window == marker)
355    else {
356        return Err(ReplicaQueryError::Query(
357            "malformed HTTP response".to_string(),
358        ));
359    };
360    let header = String::from_utf8_lossy(&response[..index]);
361    let status_ok = header
362        .lines()
363        .next()
364        .is_some_and(|status| status.contains(" 2"));
365    if !status_ok {
366        return Err(ReplicaQueryError::Query(header.to_string()));
367    }
368    Ok(response[index + marker.len()..].to_vec())
369}
370
371///
372/// QueryEnvelope
373///
374
375#[derive(Serialize)]
376struct QueryEnvelope<'a> {
377    content: QueryContent<'a>,
378}
379
380///
381/// QueryContent
382///
383
384#[derive(Serialize)]
385struct QueryContent<'a> {
386    request_type: &'static str,
387    #[serde(with = "serde_bytes")]
388    canister_id: &'a [u8],
389    method_name: &'a str,
390    #[serde(with = "serde_bytes")]
391    arg: &'a [u8],
392    #[serde(with = "serde_bytes")]
393    sender: &'a [u8],
394    ingress_expiry: u64,
395}
396
397///
398/// QueryResponse
399///
400
401#[derive(Deserialize)]
402struct QueryResponse {
403    status: String,
404    reply: Option<QueryReply>,
405    reject_code: Option<u64>,
406    reject_message: Option<String>,
407}
408
409///
410/// QueryReply
411///
412
413#[derive(Deserialize)]
414struct QueryReply {
415    #[serde(with = "serde_bytes")]
416    arg: Vec<u8>,
417}
418
419///
420/// SubnetRegistryResponseWire
421///
422
423#[derive(CandidType, Deserialize)]
424struct SubnetRegistryResponseWire(Vec<SubnetRegistryEntryWire>);
425
426impl SubnetRegistryResponseWire {
427    // Convert direct Candid query output into the command JSON shape the discovery parser accepts.
428    fn to_cli_json(&self) -> serde_json::Value {
429        serde_json::json!({
430            "Ok": self.0.iter().map(SubnetRegistryEntryWire::to_cli_json).collect::<Vec<_>>()
431        })
432    }
433}
434
435///
436/// SubnetRegistryEntryWire
437///
438
439#[derive(CandidType, Deserialize)]
440struct SubnetRegistryEntryWire {
441    pid: Principal,
442    role: String,
443    record: CanisterInfoWire,
444}
445
446impl SubnetRegistryEntryWire {
447    // Convert one registry entry into the command JSON shape used by existing list rendering.
448    fn to_cli_json(&self) -> serde_json::Value {
449        serde_json::json!({
450            "pid": self.pid.to_text(),
451            "role": self.role,
452            "record": self.record.to_cli_json(),
453        })
454    }
455}
456
457///
458/// CanisterInfoWire
459///
460
461#[derive(CandidType, Deserialize)]
462struct CanisterInfoWire {
463    pid: Principal,
464    role: String,
465    parent_pid: Option<Principal>,
466    module_hash: Option<Vec<u8>>,
467    created_at: u64,
468}
469
470impl CanisterInfoWire {
471    // Convert one canister info record into a CLI-like JSON object.
472    fn to_cli_json(&self) -> serde_json::Value {
473        serde_json::json!({
474            "pid": self.pid.to_text(),
475            "role": self.role,
476            "parent_pid": self.parent_pid.as_ref().map(Principal::to_text),
477            "module_hash": self.module_hash,
478            "created_at": self.created_at.to_string(),
479        })
480    }
481}
482
483///
484/// CanicErrorWire
485///
486
487#[derive(CandidType, Deserialize)]
488struct CanicErrorWire {
489    code: ErrorCodeWire,
490    message: String,
491}
492
493impl fmt::Display for CanicErrorWire {
494    // Render a compact public API error from a direct local replica query.
495    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
496        write!(formatter, "{:?}: {}", self.code, self.message)
497    }
498}
499
500///
501/// ErrorCodeWire
502///
503
504#[derive(CandidType, Debug, Deserialize)]
505enum ErrorCodeWire {
506    Conflict,
507    Forbidden,
508    Internal,
509    InvalidInput,
510    InvariantViolation,
511    NotFound,
512    PolicyInstanceRequiresSingletonWithDirectory,
513    PolicyReplicaRequiresSingletonWithScaling,
514    PolicyRoleAlreadyRegistered,
515    PolicyShardRequiresSingletonWithSharding,
516    PolicySingletonAlreadyRegisteredUnderParent,
517    ResourceExhausted,
518    Unauthorized,
519    Unavailable,
520}
521
522#[cfg(test)]
523mod tests {
524    use super::*;
525
526    // Ensure readiness parsing accepts common command-line JSON result shapes.
527    #[test]
528    fn parse_ready_json_value_accepts_nested_true_shapes() {
529        assert!(parse_ready_json_value(&serde_json::json!(true)));
530        assert!(parse_ready_json_value(&serde_json::json!({ "Ok": true })));
531        assert!(parse_ready_json_value(&serde_json::json!([{ "Ok": true }])));
532        assert!(parse_ready_json_value(&serde_json::json!({
533            "response_candid": "(true)"
534        })));
535    }
536
537    // Ensure readiness parsing rejects false and non-boolean result shapes.
538    #[test]
539    fn parse_ready_json_value_rejects_false_shapes() {
540        assert!(!parse_ready_json_value(&serde_json::json!(false)));
541        assert!(!parse_ready_json_value(&serde_json::json!({ "Ok": false })));
542        assert!(!parse_ready_json_value(&serde_json::json!("true")));
543    }
544
545    // Ensure direct local queries use the ICP CLI local endpoint fallback when no project port is configured.
546    #[test]
547    fn local_replica_endpoint_defaults_to_icp_cli_port() {
548        assert_eq!(
549            local_replica_endpoint_with_port(None, None),
550            "http://127.0.0.1:8000"
551        );
552        assert_eq!(
553            local_replica_endpoint_with_port(None, Some(8001)),
554            "http://127.0.0.1:8001"
555        );
556        assert_eq!(
557            local_replica_endpoint_with_port(Some("http://127.0.0.1:9000/"), Some(8001)),
558            "http://127.0.0.1:9000"
559        );
560    }
561
562    #[test]
563    fn parses_local_replica_root_key_from_json_status() {
564        let root_key = parse_local_replica_root_key(br#"{"root_key":"308182"}"#);
565
566        assert_eq!(root_key.as_deref(), Some("308182"));
567    }
568
569    #[test]
570    fn parses_local_replica_root_key_from_cbor_status() {
571        #[derive(Serialize)]
572        struct Status {
573            #[serde(with = "serde_bytes")]
574            root_key: Vec<u8>,
575        }
576
577        let body = serde_cbor::to_vec(&Status {
578            root_key: vec![0x30, 0x81, 0x82],
579        })
580        .expect("encode cbor status");
581        let root_key = parse_local_replica_root_key(&body);
582
583        assert_eq!(root_key.as_deref(), Some("308182"));
584    }
585
586    #[test]
587    fn rejects_blank_local_replica_root_key_status_values() {
588        #[derive(Serialize)]
589        struct Status {
590            #[serde(with = "serde_bytes")]
591            root_key: Vec<u8>,
592        }
593
594        assert_eq!(parse_local_replica_root_key(br#"{"root_key":"   "}"#), None);
595
596        let body = serde_cbor::to_vec(&Status { root_key: vec![] })
597            .expect("encode empty cbor status root key");
598
599        assert_eq!(parse_local_replica_root_key(&body), None);
600    }
601}