Skip to main content

canic_host/
replica_query.rs

1use candid::{CandidType, Decode, Encode, Principal};
2use serde::{Deserialize, Serialize};
3use std::{
4    error::Error,
5    fmt,
6    io::{Read, Write},
7    net::TcpStream,
8    time::{SystemTime, UNIX_EPOCH},
9};
10
11///
12/// ReplicaQueryError
13///
14
15#[derive(Debug)]
16pub enum ReplicaQueryError {
17    Io(std::io::Error),
18    Cbor(serde_cbor::Error),
19    Json(serde_json::Error),
20    Query(String),
21    Rejected { code: u64, message: String },
22}
23
24impl fmt::Display for ReplicaQueryError {
25    // Render local replica query failures as compact operator diagnostics.
26    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
27        match self {
28            Self::Io(err) => write!(formatter, "{err}"),
29            Self::Cbor(err) => write!(formatter, "{err}"),
30            Self::Json(err) => write!(formatter, "{err}"),
31            Self::Query(message) => write!(formatter, "{message}"),
32            Self::Rejected { code, message } => {
33                write!(
34                    formatter,
35                    "local replica rejected query: code={code} message={message}"
36                )
37            }
38        }
39    }
40}
41
42impl Error for ReplicaQueryError {
43    // Preserve structured source errors for I/O and serialization failures.
44    fn source(&self) -> Option<&(dyn Error + 'static)> {
45        match self {
46            Self::Io(err) => Some(err),
47            Self::Cbor(err) => Some(err),
48            Self::Json(err) => Some(err),
49            Self::Query(_) | Self::Rejected { .. } => None,
50        }
51    }
52}
53
54impl From<std::io::Error> for ReplicaQueryError {
55    // Convert local socket and process I/O failures.
56    fn from(err: std::io::Error) -> Self {
57        Self::Io(err)
58    }
59}
60
61impl From<serde_cbor::Error> for ReplicaQueryError {
62    // Convert CBOR encode/decode failures.
63    fn from(err: serde_cbor::Error) -> Self {
64        Self::Cbor(err)
65    }
66}
67
68impl From<serde_json::Error> for ReplicaQueryError {
69    // Convert JSON rendering failures.
70    fn from(err: serde_json::Error) -> Self {
71        Self::Json(err)
72    }
73}
74
75/// Return whether the selected network should use direct local replica queries.
76#[must_use]
77pub fn should_use_local_replica_query(network: Option<&str>) -> bool {
78    network.is_none_or(|network| network == "local" || network.starts_with("http://"))
79}
80
81/// Query `canic_ready` directly through the local replica HTTP API.
82pub fn query_ready(network: Option<&str>, canister: &str) -> Result<bool, ReplicaQueryError> {
83    let bytes = local_query(network, canister, "canic_ready")?;
84    Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
85}
86
87/// Parse common JSON shapes returned by command-line calls for `canic_ready`.
88#[must_use]
89pub fn parse_ready_json_value(data: &serde_json::Value) -> bool {
90    match data {
91        serde_json::Value::Bool(value) => *value,
92        serde_json::Value::String(value) => value.trim() == "(true)",
93        serde_json::Value::Array(values) => values.iter().any(parse_ready_json_value),
94        serde_json::Value::Object(map) => map.values().any(parse_ready_json_value),
95        _ => false,
96    }
97}
98
99/// Query `canic_subnet_registry` and render ICP-compatible JSON.
100pub fn query_subnet_registry_json(
101    network: Option<&str>,
102    root: &str,
103) -> Result<String, ReplicaQueryError> {
104    let bytes = local_query(network, root, "canic_subnet_registry")?;
105    let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
106        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
107    let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
108    serde_json::to_string(&response.to_cli_json()).map_err(ReplicaQueryError::from)
109}
110
111// Execute one anonymous query call against the local replica.
112fn local_query(
113    network: Option<&str>,
114    canister: &str,
115    method: &str,
116) -> Result<Vec<u8>, ReplicaQueryError> {
117    let canister_id =
118        Principal::from_text(canister).map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
119    let arg = Encode!().map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
120    let sender = Principal::anonymous();
121    let envelope = QueryEnvelope {
122        content: QueryContent {
123            request_type: "query",
124            canister_id: canister_id.as_slice(),
125            method_name: method,
126            arg: &arg,
127            sender: sender.as_slice(),
128            ingress_expiry: ingress_expiry_nanos()?,
129        },
130    };
131    let body = serde_cbor::to_vec(&envelope)?;
132    let endpoint = local_replica_endpoint(network);
133    let response = post_cbor(
134        &endpoint,
135        &format!("/api/v2/canister/{canister}/query"),
136        &body,
137    )?;
138    let query_response = serde_cbor::from_slice::<QueryResponse>(&response)?;
139
140    if query_response.status == "replied" {
141        return query_response
142            .reply
143            .map(|reply| reply.arg)
144            .ok_or_else(|| ReplicaQueryError::Query("missing query reply".to_string()));
145    }
146
147    Err(ReplicaQueryError::Rejected {
148        code: query_response.reject_code.unwrap_or_default(),
149        message: query_response.reject_message.unwrap_or_default(),
150    })
151}
152
153// Resolve the local replica endpoint from explicit URL or the conventional ICP CLI local port.
154fn local_replica_endpoint(network: Option<&str>) -> String {
155    if let Some(network) = network.filter(|network| network.starts_with("http://")) {
156        return network.trim_end_matches('/').to_string();
157    }
158
159    "http://127.0.0.1:8000".to_string()
160}
161
162// Return an ingress expiry comfortably in the near future for local queries.
163fn ingress_expiry_nanos() -> Result<u64, ReplicaQueryError> {
164    let now = SystemTime::now()
165        .duration_since(UNIX_EPOCH)
166        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
167    let expiry = now
168        .as_nanos()
169        .saturating_add(5 * 60 * 1_000_000_000)
170        .min(u128::from(u64::MAX));
171    u64::try_from(expiry).map_err(|err| ReplicaQueryError::Query(err.to_string()))
172}
173
174// POST one CBOR request over simple HTTP/1.1 and return the response body.
175fn post_cbor(endpoint: &str, path: &str, body: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
176    let (host, port) = parse_http_endpoint(endpoint)?;
177    let mut stream = TcpStream::connect((host.as_str(), port))?;
178    let request = format!(
179        "POST {path} HTTP/1.1\r\nHost: {host}:{port}\r\nContent-Type: application/cbor\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
180        body.len()
181    );
182    stream.write_all(request.as_bytes())?;
183    stream.write_all(body)?;
184
185    let mut response = Vec::new();
186    stream.read_to_end(&mut response)?;
187    split_http_body(&response)
188}
189
190// Parse the limited HTTP endpoints supported by local direct queries.
191fn parse_http_endpoint(endpoint: &str) -> Result<(String, u16), ReplicaQueryError> {
192    let rest = endpoint
193        .strip_prefix("http://")
194        .ok_or_else(|| ReplicaQueryError::Query(format!("unsupported endpoint {endpoint}")))?;
195    let authority = rest.split('/').next().unwrap_or(rest);
196    let (host, port) = authority
197        .rsplit_once(':')
198        .ok_or_else(|| ReplicaQueryError::Query(format!("missing port in {endpoint}")))?;
199    let port = port
200        .parse::<u16>()
201        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
202    Ok((host.to_string(), port))
203}
204
205// Split a simple HTTP response and reject non-2xx status codes.
206fn split_http_body(response: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
207    let marker = b"\r\n\r\n";
208    let Some(index) = response
209        .windows(marker.len())
210        .position(|window| window == marker)
211    else {
212        return Err(ReplicaQueryError::Query(
213            "malformed HTTP response".to_string(),
214        ));
215    };
216    let header = String::from_utf8_lossy(&response[..index]);
217    let status_ok = header
218        .lines()
219        .next()
220        .is_some_and(|status| status.contains(" 2"));
221    if !status_ok {
222        return Err(ReplicaQueryError::Query(header.to_string()));
223    }
224    Ok(response[index + marker.len()..].to_vec())
225}
226
227///
228/// QueryEnvelope
229///
230
231#[derive(Serialize)]
232struct QueryEnvelope<'a> {
233    content: QueryContent<'a>,
234}
235
236///
237/// QueryContent
238///
239
240#[derive(Serialize)]
241struct QueryContent<'a> {
242    request_type: &'static str,
243    #[serde(with = "serde_bytes")]
244    canister_id: &'a [u8],
245    method_name: &'a str,
246    #[serde(with = "serde_bytes")]
247    arg: &'a [u8],
248    #[serde(with = "serde_bytes")]
249    sender: &'a [u8],
250    ingress_expiry: u64,
251}
252
253///
254/// QueryResponse
255///
256
257#[derive(Deserialize)]
258struct QueryResponse {
259    status: String,
260    reply: Option<QueryReply>,
261    reject_code: Option<u64>,
262    reject_message: Option<String>,
263}
264
265///
266/// QueryReply
267///
268
269#[derive(Deserialize)]
270struct QueryReply {
271    #[serde(with = "serde_bytes")]
272    arg: Vec<u8>,
273}
274
275///
276/// SubnetRegistryResponseWire
277///
278
279#[derive(CandidType, Deserialize)]
280struct SubnetRegistryResponseWire(Vec<SubnetRegistryEntryWire>);
281
282impl SubnetRegistryResponseWire {
283    // Convert direct Candid query output into the command JSON shape the discovery parser accepts.
284    fn to_cli_json(&self) -> serde_json::Value {
285        serde_json::json!({
286            "Ok": self.0.iter().map(SubnetRegistryEntryWire::to_cli_json).collect::<Vec<_>>()
287        })
288    }
289}
290
291///
292/// SubnetRegistryEntryWire
293///
294
295#[derive(CandidType, Deserialize)]
296struct SubnetRegistryEntryWire {
297    pid: Principal,
298    role: String,
299    record: CanisterInfoWire,
300}
301
302impl SubnetRegistryEntryWire {
303    // Convert one registry entry into the command JSON shape used by existing list rendering.
304    fn to_cli_json(&self) -> serde_json::Value {
305        serde_json::json!({
306            "pid": self.pid.to_text(),
307            "role": self.role,
308            "record": self.record.to_cli_json(),
309        })
310    }
311}
312
313///
314/// CanisterInfoWire
315///
316
317#[derive(CandidType, Deserialize)]
318struct CanisterInfoWire {
319    pid: Principal,
320    role: String,
321    parent_pid: Option<Principal>,
322    module_hash: Option<Vec<u8>>,
323    created_at: u64,
324}
325
326impl CanisterInfoWire {
327    // Convert one canister info record into a CLI-like JSON object.
328    fn to_cli_json(&self) -> serde_json::Value {
329        serde_json::json!({
330            "pid": self.pid.to_text(),
331            "role": self.role,
332            "parent_pid": self.parent_pid.as_ref().map(Principal::to_text),
333            "module_hash": self.module_hash,
334            "created_at": self.created_at.to_string(),
335        })
336    }
337}
338
339///
340/// CanicErrorWire
341///
342
343#[derive(CandidType, Deserialize)]
344struct CanicErrorWire {
345    code: ErrorCodeWire,
346    message: String,
347}
348
349impl fmt::Display for CanicErrorWire {
350    // Render a compact public API error from a direct local replica query.
351    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
352        write!(formatter, "{:?}: {}", self.code, self.message)
353    }
354}
355
356///
357/// ErrorCodeWire
358///
359
360#[derive(CandidType, Debug, Deserialize)]
361enum ErrorCodeWire {
362    Conflict,
363    Forbidden,
364    Internal,
365    InvalidInput,
366    InvariantViolation,
367    NotFound,
368    PolicyInstanceRequiresSingletonWithDirectory,
369    PolicyReplicaRequiresSingletonWithScaling,
370    PolicyRoleAlreadyRegistered,
371    PolicyShardRequiresSingletonWithSharding,
372    PolicySingletonAlreadyRegisteredUnderParent,
373    ResourceExhausted,
374    Unauthorized,
375    Unavailable,
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    // Ensure readiness parsing accepts common command-line JSON result shapes.
383    #[test]
384    fn parse_ready_json_value_accepts_nested_true_shapes() {
385        assert!(parse_ready_json_value(&serde_json::json!(true)));
386        assert!(parse_ready_json_value(&serde_json::json!({ "Ok": true })));
387        assert!(parse_ready_json_value(&serde_json::json!([{ "Ok": true }])));
388        assert!(parse_ready_json_value(&serde_json::json!({
389            "response_candid": "(true)"
390        })));
391    }
392
393    // Ensure readiness parsing rejects false and non-boolean result shapes.
394    #[test]
395    fn parse_ready_json_value_rejects_false_shapes() {
396        assert!(!parse_ready_json_value(&serde_json::json!(false)));
397        assert!(!parse_ready_json_value(&serde_json::json!({ "Ok": false })));
398        assert!(!parse_ready_json_value(&serde_json::json!("true")));
399    }
400
401    // Ensure direct local queries use the ICP CLI local endpoint by default.
402    #[test]
403    fn local_replica_endpoint_defaults_to_icp_cli_port() {
404        assert_eq!(local_replica_endpoint(None), "http://127.0.0.1:8000");
405        assert_eq!(
406            local_replica_endpoint(Some("http://127.0.0.1:9000/")),
407            "http://127.0.0.1:9000"
408        );
409    }
410}