Skip to main content

canic_host/
replica_query.rs

1use crate::icp_config::{
2    DEFAULT_LOCAL_GATEWAY_PORT, configured_local_gateway_port,
3    configured_local_gateway_port_from_root,
4};
5use candid::{CandidType, Decode, Encode, Principal};
6use serde::{Deserialize, Serialize};
7use std::{
8    error::Error,
9    fmt,
10    io::{Read, Write},
11    net::TcpStream,
12    path::Path,
13    time::{SystemTime, UNIX_EPOCH},
14};
15
16///
17/// ReplicaQueryError
18///
19
20#[derive(Debug)]
21pub enum ReplicaQueryError {
22    Io(std::io::Error),
23    Cbor(serde_cbor::Error),
24    Json(serde_json::Error),
25    Query(String),
26    Rejected { code: u64, message: String },
27}
28
29impl fmt::Display for ReplicaQueryError {
30    // Render local replica query failures as compact operator diagnostics.
31    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
32        match self {
33            Self::Io(err) => write!(formatter, "{err}"),
34            Self::Cbor(err) => write!(formatter, "{err}"),
35            Self::Json(err) => write!(formatter, "{err}"),
36            Self::Query(message) => write!(formatter, "{message}"),
37            Self::Rejected { code, message } => {
38                write!(
39                    formatter,
40                    "local replica rejected query: code={code} message={message}"
41                )
42            }
43        }
44    }
45}
46
47impl Error for ReplicaQueryError {
48    // Preserve structured source errors for I/O and serialization failures.
49    fn source(&self) -> Option<&(dyn Error + 'static)> {
50        match self {
51            Self::Io(err) => Some(err),
52            Self::Cbor(err) => Some(err),
53            Self::Json(err) => Some(err),
54            Self::Query(_) | Self::Rejected { .. } => None,
55        }
56    }
57}
58
59impl From<std::io::Error> for ReplicaQueryError {
60    // Convert local socket and process I/O failures.
61    fn from(err: std::io::Error) -> Self {
62        Self::Io(err)
63    }
64}
65
66impl From<serde_cbor::Error> for ReplicaQueryError {
67    // Convert CBOR encode/decode failures.
68    fn from(err: serde_cbor::Error) -> Self {
69        Self::Cbor(err)
70    }
71}
72
73impl From<serde_json::Error> for ReplicaQueryError {
74    // Convert JSON rendering failures.
75    fn from(err: serde_json::Error) -> Self {
76        Self::Json(err)
77    }
78}
79
80/// Return whether the selected network should use direct local replica queries.
81#[must_use]
82pub fn should_use_local_replica_query(network: Option<&str>) -> bool {
83    network.is_none_or(|network| network == "local" || network.starts_with("http://"))
84}
85
86/// Query `canic_ready` directly through the local replica HTTP API.
87pub fn query_ready(network: Option<&str>, canister: &str) -> Result<bool, ReplicaQueryError> {
88    let bytes = local_query(network, canister, "canic_ready")?;
89    Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
90}
91
92/// Query `canic_ready` using the configured port from one ICP root.
93pub fn query_ready_from_root(
94    network: Option<&str>,
95    canister: &str,
96    icp_root: &Path,
97) -> Result<bool, ReplicaQueryError> {
98    let bytes = local_query_from_root(network, canister, "canic_ready", icp_root)?;
99    Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
100}
101
102/// Return true when the local replica HTTP status endpoint is reachable.
103#[must_use]
104pub fn local_replica_status_reachable_from_root(network: Option<&str>, icp_root: &Path) -> bool {
105    get_http_status(&local_replica_endpoint_from_root(network, icp_root)).is_ok()
106}
107
108/// Return the HTTP endpoint Canic should use for a local replica under one ICP root.
109#[must_use]
110pub fn local_replica_endpoint_from_root(network: Option<&str>, icp_root: &Path) -> String {
111    local_replica_endpoint_with_port(
112        network,
113        configured_local_gateway_port_from_root(icp_root).ok(),
114    )
115}
116
117/// Return the replica root key advertised by the local status endpoint.
118pub fn local_replica_root_key_from_root(
119    network: Option<&str>,
120    icp_root: &Path,
121) -> Result<Option<String>, ReplicaQueryError> {
122    let endpoint = local_replica_endpoint_from_root(network, icp_root);
123    let body = get_http_status(&endpoint)?;
124    Ok(parse_local_replica_root_key(&body))
125}
126
127fn parse_local_replica_root_key(body: &[u8]) -> Option<String> {
128    serde_json::from_slice::<serde_json::Value>(body)
129        .ok()
130        .and_then(|value| root_key_from_json(&value))
131        .or_else(|| {
132            serde_cbor::from_slice::<serde_cbor::Value>(body)
133                .ok()
134                .and_then(|value| root_key_from_cbor(&value))
135        })
136}
137
138fn root_key_from_json(value: &serde_json::Value) -> Option<String> {
139    match value {
140        serde_json::Value::String(text) => nonempty_text(text),
141        serde_json::Value::Array(values) => values.iter().find_map(root_key_from_json),
142        serde_json::Value::Object(map) => map
143            .get("root_key")
144            .and_then(root_key_from_json)
145            .or_else(|| map.values().find_map(root_key_from_json)),
146        _ => None,
147    }
148}
149
150fn root_key_from_cbor(value: &serde_cbor::Value) -> Option<String> {
151    match value {
152        serde_cbor::Value::Bytes(bytes) => (!bytes.is_empty()).then(|| hex_bytes(bytes)),
153        serde_cbor::Value::Text(text) => nonempty_text(text),
154        serde_cbor::Value::Array(values) => values.iter().find_map(root_key_from_cbor),
155        serde_cbor::Value::Map(map) => map
156            .iter()
157            .find_map(|(key, value)| match key {
158                serde_cbor::Value::Text(key) if key == "root_key" => root_key_from_cbor(value),
159                _ => None,
160            })
161            .or_else(|| map.values().find_map(root_key_from_cbor)),
162        _ => None,
163    }
164}
165
166fn nonempty_text(text: &str) -> Option<String> {
167    let trimmed = text.trim();
168    (!trimmed.is_empty()).then(|| trimmed.to_string())
169}
170
171fn hex_bytes(bytes: &[u8]) -> String {
172    let mut encoded = String::with_capacity(bytes.len() * 2);
173    for byte in bytes {
174        use std::fmt::Write as _;
175        let _ = write!(encoded, "{byte:02x}");
176    }
177    encoded
178}
179
180/// Parse common JSON shapes returned by command-line calls for `canic_ready`.
181#[must_use]
182pub fn parse_ready_json_value(data: &serde_json::Value) -> bool {
183    match data {
184        serde_json::Value::Bool(value) => *value,
185        serde_json::Value::String(value) => value.trim() == "(true)",
186        serde_json::Value::Array(values) => values.iter().any(parse_ready_json_value),
187        serde_json::Value::Object(map) => map.values().any(parse_ready_json_value),
188        _ => false,
189    }
190}
191
192/// Query `canic_subnet_registry` and render JSON in the CLI response shape.
193pub fn query_subnet_registry_json(
194    network: Option<&str>,
195    root: &str,
196) -> Result<String, ReplicaQueryError> {
197    let bytes = local_query(network, root, "canic_subnet_registry")?;
198    let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
199        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
200    let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
201    serde_json::to_string(&response.to_cli_json()).map_err(ReplicaQueryError::from)
202}
203
204/// Query `canic_subnet_registry` using the configured port from one ICP root.
205pub fn query_subnet_registry_json_from_root(
206    network: Option<&str>,
207    root: &str,
208    icp_root: &Path,
209) -> Result<String, ReplicaQueryError> {
210    let bytes = local_query_from_root(network, root, "canic_subnet_registry", icp_root)?;
211    let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
212        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
213    let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
214    serde_json::to_string(&response.to_cli_json()).map_err(ReplicaQueryError::from)
215}
216
217// Execute one anonymous query call against the local replica.
218fn local_query(
219    network: Option<&str>,
220    canister: &str,
221    method: &str,
222) -> Result<Vec<u8>, ReplicaQueryError> {
223    local_query_with_endpoint(network, canister, method, local_replica_endpoint(network))
224}
225
226fn local_query_from_root(
227    network: Option<&str>,
228    canister: &str,
229    method: &str,
230    icp_root: &Path,
231) -> Result<Vec<u8>, ReplicaQueryError> {
232    local_query_with_endpoint(
233        network,
234        canister,
235        method,
236        local_replica_endpoint_from_root(network, icp_root),
237    )
238}
239
240fn local_query_with_endpoint(
241    _network: Option<&str>,
242    canister: &str,
243    method: &str,
244    endpoint: String,
245) -> Result<Vec<u8>, ReplicaQueryError> {
246    let canister_id =
247        Principal::from_text(canister).map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
248    let arg = Encode!().map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
249    let sender = Principal::anonymous();
250    let envelope = QueryEnvelope {
251        content: QueryContent {
252            request_type: "query",
253            canister_id: canister_id.as_slice(),
254            method_name: method,
255            arg: &arg,
256            sender: sender.as_slice(),
257            ingress_expiry: ingress_expiry_nanos()?,
258        },
259    };
260    let body = serde_cbor::to_vec(&envelope)?;
261    let response = post_cbor(
262        &endpoint,
263        &format!("/api/v2/canister/{canister}/query"),
264        &body,
265    )?;
266    let query_response = serde_cbor::from_slice::<QueryResponse>(&response)?;
267
268    if query_response.status == "replied" {
269        return query_response
270            .reply
271            .map(|reply| reply.arg)
272            .ok_or_else(|| ReplicaQueryError::Query("missing query reply".to_string()));
273    }
274
275    Err(ReplicaQueryError::Rejected {
276        code: query_response.reject_code.unwrap_or_default(),
277        message: query_response.reject_message.unwrap_or_default(),
278    })
279}
280
281// Resolve the local replica endpoint from explicit URL or the configured ICP CLI local port.
282fn local_replica_endpoint(network: Option<&str>) -> String {
283    local_replica_endpoint_with_port(network, configured_local_gateway_port().ok())
284}
285
286// Format the local replica endpoint from an explicit URL, configured port, or ICP default.
287fn local_replica_endpoint_with_port(network: Option<&str>, configured_port: Option<u16>) -> String {
288    if let Some(network) = network.filter(|network| network.starts_with("http://")) {
289        return network.trim_end_matches('/').to_string();
290    }
291
292    let port = configured_port.unwrap_or(DEFAULT_LOCAL_GATEWAY_PORT);
293    format!("http://127.0.0.1:{port}")
294}
295
296// Return an ingress expiry comfortably in the near future for local queries.
297fn ingress_expiry_nanos() -> Result<u64, ReplicaQueryError> {
298    let now = SystemTime::now()
299        .duration_since(UNIX_EPOCH)
300        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
301    let expiry = now
302        .as_nanos()
303        .saturating_add(5 * 60 * 1_000_000_000)
304        .min(u128::from(u64::MAX));
305    u64::try_from(expiry).map_err(|err| ReplicaQueryError::Query(err.to_string()))
306}
307
308// POST one CBOR request over simple HTTP/1.1 and return the response body.
309fn post_cbor(endpoint: &str, path: &str, body: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
310    let (host, port) = parse_http_endpoint(endpoint)?;
311    let mut stream = TcpStream::connect((host.as_str(), port))?;
312    let request = format!(
313        "POST {path} HTTP/1.1\r\nHost: {host}:{port}\r\nContent-Type: application/cbor\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
314        body.len()
315    );
316    stream.write_all(request.as_bytes())?;
317    stream.write_all(body)?;
318
319    let mut response = Vec::new();
320    stream.read_to_end(&mut response)?;
321    split_http_body(&response)
322}
323
324fn get_http_status(endpoint: &str) -> Result<Vec<u8>, ReplicaQueryError> {
325    let (host, port) = parse_http_endpoint(endpoint)?;
326    let mut stream = TcpStream::connect((host.as_str(), port))?;
327    let request =
328        format!("GET /api/v2/status HTTP/1.1\r\nHost: {host}:{port}\r\nConnection: close\r\n\r\n");
329    stream.write_all(request.as_bytes())?;
330
331    let mut response = Vec::new();
332    stream.read_to_end(&mut response)?;
333    split_http_body(&response)
334}
335
336// Parse the limited HTTP endpoints supported by local direct queries.
337fn parse_http_endpoint(endpoint: &str) -> Result<(String, u16), ReplicaQueryError> {
338    let rest = endpoint
339        .strip_prefix("http://")
340        .ok_or_else(|| ReplicaQueryError::Query(format!("unsupported endpoint {endpoint}")))?;
341    let authority = rest.split('/').next().unwrap_or(rest);
342    let (host, port) = authority
343        .rsplit_once(':')
344        .ok_or_else(|| ReplicaQueryError::Query(format!("missing port in {endpoint}")))?;
345    let port = port
346        .parse::<u16>()
347        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
348    Ok((host.to_string(), port))
349}
350
351// Split a simple HTTP response and reject non-2xx status codes.
352fn split_http_body(response: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
353    let marker = b"\r\n\r\n";
354    let Some(index) = response
355        .windows(marker.len())
356        .position(|window| window == marker)
357    else {
358        return Err(ReplicaQueryError::Query(
359            "malformed HTTP response".to_string(),
360        ));
361    };
362    let header = String::from_utf8_lossy(&response[..index]);
363    let status_ok = header
364        .lines()
365        .next()
366        .is_some_and(|status| status.contains(" 2"));
367    if !status_ok {
368        return Err(ReplicaQueryError::Query(header.to_string()));
369    }
370    Ok(response[index + marker.len()..].to_vec())
371}
372
373///
374/// QueryEnvelope
375///
376
377#[derive(Serialize)]
378struct QueryEnvelope<'a> {
379    content: QueryContent<'a>,
380}
381
382///
383/// QueryContent
384///
385
386#[derive(Serialize)]
387struct QueryContent<'a> {
388    request_type: &'static str,
389    #[serde(with = "serde_bytes")]
390    canister_id: &'a [u8],
391    method_name: &'a str,
392    #[serde(with = "serde_bytes")]
393    arg: &'a [u8],
394    #[serde(with = "serde_bytes")]
395    sender: &'a [u8],
396    ingress_expiry: u64,
397}
398
399///
400/// QueryResponse
401///
402
403#[derive(Deserialize)]
404struct QueryResponse {
405    status: String,
406    reply: Option<QueryReply>,
407    reject_code: Option<u64>,
408    reject_message: Option<String>,
409}
410
411///
412/// QueryReply
413///
414
415#[derive(Deserialize)]
416struct QueryReply {
417    #[serde(with = "serde_bytes")]
418    arg: Vec<u8>,
419}
420
421///
422/// SubnetRegistryResponseWire
423///
424
425#[derive(CandidType, Deserialize)]
426struct SubnetRegistryResponseWire(Vec<SubnetRegistryEntryWire>);
427
428impl SubnetRegistryResponseWire {
429    // Convert direct Candid query output into the command JSON shape the discovery parser accepts.
430    fn to_cli_json(&self) -> serde_json::Value {
431        serde_json::json!({
432            "Ok": self.0.iter().map(SubnetRegistryEntryWire::to_cli_json).collect::<Vec<_>>()
433        })
434    }
435}
436
437///
438/// SubnetRegistryEntryWire
439///
440
441#[derive(CandidType, Deserialize)]
442struct SubnetRegistryEntryWire {
443    pid: Principal,
444    role: String,
445    record: CanisterInfoWire,
446}
447
448impl SubnetRegistryEntryWire {
449    // Convert one registry entry into the command JSON shape used by existing list rendering.
450    fn to_cli_json(&self) -> serde_json::Value {
451        serde_json::json!({
452            "pid": self.pid.to_text(),
453            "role": self.role,
454            "record": self.record.to_cli_json(),
455        })
456    }
457}
458
459///
460/// CanisterInfoWire
461///
462
463#[derive(CandidType, Deserialize)]
464struct CanisterInfoWire {
465    pid: Principal,
466    role: String,
467    parent_pid: Option<Principal>,
468    module_hash: Option<Vec<u8>>,
469    created_at: u64,
470}
471
472impl CanisterInfoWire {
473    // Convert one canister info record into a CLI-like JSON object.
474    fn to_cli_json(&self) -> serde_json::Value {
475        serde_json::json!({
476            "pid": self.pid.to_text(),
477            "role": self.role,
478            "parent_pid": self.parent_pid.as_ref().map(Principal::to_text),
479            "module_hash": self.module_hash,
480            "created_at": self.created_at.to_string(),
481        })
482    }
483}
484
485///
486/// CanicErrorWire
487///
488
489#[derive(CandidType, Deserialize)]
490struct CanicErrorWire {
491    code: ErrorCodeWire,
492    message: String,
493}
494
495impl fmt::Display for CanicErrorWire {
496    // Render a compact public API error from a direct local replica query.
497    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
498        write!(formatter, "{:?}: {}", self.code, self.message)
499    }
500}
501
502///
503/// ErrorCodeWire
504///
505
506#[derive(CandidType, Debug, Deserialize)]
507enum ErrorCodeWire {
508    Conflict,
509    Forbidden,
510    Internal,
511    InvalidInput,
512    InvariantViolation,
513    NotFound,
514    PolicyInstanceRequiresSingletonWithDirectory,
515    PolicyReplicaRequiresSingletonWithScaling,
516    PolicyRoleAlreadyRegistered,
517    PolicyShardRequiresSingletonWithSharding,
518    PolicySingletonAlreadyRegisteredUnderParent,
519    ResourceExhausted,
520    Unauthorized,
521    Unavailable,
522}
523
524#[cfg(test)]
525mod tests {
526    use super::*;
527
528    // Ensure readiness parsing accepts common command-line JSON result shapes.
529    #[test]
530    fn parse_ready_json_value_accepts_nested_true_shapes() {
531        assert!(parse_ready_json_value(&serde_json::json!(true)));
532        assert!(parse_ready_json_value(&serde_json::json!({ "Ok": true })));
533        assert!(parse_ready_json_value(&serde_json::json!([{ "Ok": true }])));
534        assert!(parse_ready_json_value(&serde_json::json!({
535            "response_candid": "(true)"
536        })));
537    }
538
539    // Ensure readiness parsing rejects false and non-boolean result shapes.
540    #[test]
541    fn parse_ready_json_value_rejects_false_shapes() {
542        assert!(!parse_ready_json_value(&serde_json::json!(false)));
543        assert!(!parse_ready_json_value(&serde_json::json!({ "Ok": false })));
544        assert!(!parse_ready_json_value(&serde_json::json!("true")));
545    }
546
547    // Ensure direct local queries use the ICP CLI local endpoint fallback when no project port is configured.
548    #[test]
549    fn local_replica_endpoint_defaults_to_icp_cli_port() {
550        assert_eq!(
551            local_replica_endpoint_with_port(None, None),
552            "http://127.0.0.1:8000"
553        );
554        assert_eq!(
555            local_replica_endpoint_with_port(None, Some(8001)),
556            "http://127.0.0.1:8001"
557        );
558        assert_eq!(
559            local_replica_endpoint_with_port(Some("http://127.0.0.1:9000/"), Some(8001)),
560            "http://127.0.0.1:9000"
561        );
562    }
563
564    #[test]
565    fn parses_local_replica_root_key_from_json_status() {
566        let root_key = parse_local_replica_root_key(br#"{"root_key":"308182"}"#);
567
568        assert_eq!(root_key.as_deref(), Some("308182"));
569    }
570
571    #[test]
572    fn parses_local_replica_root_key_from_cbor_status() {
573        #[derive(Serialize)]
574        struct Status {
575            #[serde(with = "serde_bytes")]
576            root_key: Vec<u8>,
577        }
578
579        let body = serde_cbor::to_vec(&Status {
580            root_key: vec![0x30, 0x81, 0x82],
581        })
582        .expect("encode cbor status");
583        let root_key = parse_local_replica_root_key(&body);
584
585        assert_eq!(root_key.as_deref(), Some("308182"));
586    }
587
588    #[test]
589    fn rejects_blank_local_replica_root_key_status_values() {
590        #[derive(Serialize)]
591        struct Status {
592            #[serde(with = "serde_bytes")]
593            root_key: Vec<u8>,
594        }
595
596        assert_eq!(parse_local_replica_root_key(br#"{"root_key":"   "}"#), None);
597
598        let body = serde_cbor::to_vec(&Status { root_key: vec![] })
599            .expect("encode empty cbor status root key");
600
601        assert_eq!(parse_local_replica_root_key(&body), None);
602    }
603}