Skip to main content

canic_host/
replica_query.rs

1use candid::{CandidType, Decode, Encode, Principal};
2use serde::{Deserialize, Serialize};
3use std::{
4    error::Error,
5    fmt,
6    io::{Read, Write},
7    net::TcpStream,
8    process::Command,
9    time::{SystemTime, UNIX_EPOCH},
10};
11
12use crate::dfx::run_output;
13
14///
15/// ReplicaQueryError
16///
17
18#[derive(Debug)]
19pub enum ReplicaQueryError {
20    Io(std::io::Error),
21    Cbor(serde_cbor::Error),
22    Json(serde_json::Error),
23    Query(String),
24    Rejected { code: u64, message: String },
25}
26
27impl fmt::Display for ReplicaQueryError {
28    // Render local replica query failures as compact operator diagnostics.
29    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
30        match self {
31            Self::Io(err) => write!(formatter, "{err}"),
32            Self::Cbor(err) => write!(formatter, "{err}"),
33            Self::Json(err) => write!(formatter, "{err}"),
34            Self::Query(message) => write!(formatter, "{message}"),
35            Self::Rejected { code, message } => {
36                write!(
37                    formatter,
38                    "local replica rejected query: code={code} message={message}"
39                )
40            }
41        }
42    }
43}
44
45impl Error for ReplicaQueryError {
46    // Preserve structured source errors for I/O and serialization failures.
47    fn source(&self) -> Option<&(dyn Error + 'static)> {
48        match self {
49            Self::Io(err) => Some(err),
50            Self::Cbor(err) => Some(err),
51            Self::Json(err) => Some(err),
52            Self::Query(_) | Self::Rejected { .. } => None,
53        }
54    }
55}
56
57impl From<std::io::Error> for ReplicaQueryError {
58    // Convert local socket and process I/O failures.
59    fn from(err: std::io::Error) -> Self {
60        Self::Io(err)
61    }
62}
63
64impl From<serde_cbor::Error> for ReplicaQueryError {
65    // Convert CBOR encode/decode failures.
66    fn from(err: serde_cbor::Error) -> Self {
67        Self::Cbor(err)
68    }
69}
70
71impl From<serde_json::Error> for ReplicaQueryError {
72    // Convert JSON rendering failures.
73    fn from(err: serde_json::Error) -> Self {
74        Self::Json(err)
75    }
76}
77
78/// Return whether the selected network should use direct local replica queries.
79#[must_use]
80pub fn should_use_local_replica_query(network: Option<&str>) -> bool {
81    network.is_none_or(|network| network == "local" || network.starts_with("http://"))
82}
83
84/// Query `canic_ready` directly through the local replica HTTP API.
85pub fn query_ready(
86    dfx: &str,
87    network: Option<&str>,
88    canister: &str,
89) -> Result<bool, ReplicaQueryError> {
90    let bytes = local_query(dfx, network, canister, "canic_ready")?;
91    Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
92}
93
94/// Parse common JSON shapes returned by dfx for `canic_ready`.
95#[must_use]
96pub fn parse_ready_json_value(data: &serde_json::Value) -> bool {
97    match data {
98        serde_json::Value::Bool(value) => *value,
99        serde_json::Value::Array(values) => values.iter().any(parse_ready_json_value),
100        serde_json::Value::Object(map) => map.values().any(parse_ready_json_value),
101        _ => false,
102    }
103}
104
105/// Query `canic_subnet_registry` and render DFX-compatible JSON.
106pub fn query_subnet_registry_json(
107    dfx: &str,
108    network: Option<&str>,
109    root: &str,
110) -> Result<String, ReplicaQueryError> {
111    let bytes = local_query(dfx, network, root, "canic_subnet_registry")?;
112    let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
113        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
114    let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
115    serde_json::to_string(&response.to_dfx_json()).map_err(ReplicaQueryError::from)
116}
117
118// Execute one anonymous query call against the local replica.
119fn local_query(
120    dfx: &str,
121    network: Option<&str>,
122    canister: &str,
123    method: &str,
124) -> Result<Vec<u8>, ReplicaQueryError> {
125    let canister_id =
126        Principal::from_text(canister).map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
127    let arg = Encode!().map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
128    let sender = Principal::anonymous();
129    let envelope = QueryEnvelope {
130        content: QueryContent {
131            request_type: "query",
132            canister_id: canister_id.as_slice(),
133            method_name: method,
134            arg: &arg,
135            sender: sender.as_slice(),
136            ingress_expiry: ingress_expiry_nanos()?,
137        },
138    };
139    let body = serde_cbor::to_vec(&envelope)?;
140    let endpoint = local_replica_endpoint(dfx, network);
141    let response = post_cbor(
142        &endpoint,
143        &format!("/api/v2/canister/{canister}/query"),
144        &body,
145    )?;
146    let query_response = serde_cbor::from_slice::<QueryResponse>(&response)?;
147
148    if query_response.status == "replied" {
149        return query_response
150            .reply
151            .map(|reply| reply.arg)
152            .ok_or_else(|| ReplicaQueryError::Query("missing query reply".to_string()));
153    }
154
155    Err(ReplicaQueryError::Rejected {
156        code: query_response.reject_code.unwrap_or_default(),
157        message: query_response.reject_message.unwrap_or_default(),
158    })
159}
160
161// Resolve the local replica endpoint from explicit URL or the current DFX port.
162fn local_replica_endpoint(dfx: &str, network: Option<&str>) -> String {
163    if let Some(network) = network.filter(|network| network.starts_with("http://")) {
164        return network.trim_end_matches('/').to_string();
165    }
166
167    let mut command = Command::new(dfx);
168    command.args(["info", "webserver-port"]);
169    let port = run_output(&mut command).unwrap_or_else(|_| "4943".to_string());
170    format!("http://127.0.0.1:{port}")
171}
172
173// Return an ingress expiry comfortably in the near future for local queries.
174fn ingress_expiry_nanos() -> Result<u64, ReplicaQueryError> {
175    let now = SystemTime::now()
176        .duration_since(UNIX_EPOCH)
177        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
178    let expiry = now
179        .as_nanos()
180        .saturating_add(5 * 60 * 1_000_000_000)
181        .min(u128::from(u64::MAX));
182    u64::try_from(expiry).map_err(|err| ReplicaQueryError::Query(err.to_string()))
183}
184
185// POST one CBOR request over simple HTTP/1.1 and return the response body.
186fn post_cbor(endpoint: &str, path: &str, body: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
187    let (host, port) = parse_http_endpoint(endpoint)?;
188    let mut stream = TcpStream::connect((host.as_str(), port))?;
189    let request = format!(
190        "POST {path} HTTP/1.1\r\nHost: {host}:{port}\r\nContent-Type: application/cbor\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
191        body.len()
192    );
193    stream.write_all(request.as_bytes())?;
194    stream.write_all(body)?;
195
196    let mut response = Vec::new();
197    stream.read_to_end(&mut response)?;
198    split_http_body(&response)
199}
200
201// Parse the limited HTTP endpoints supported by local direct queries.
202fn parse_http_endpoint(endpoint: &str) -> Result<(String, u16), ReplicaQueryError> {
203    let rest = endpoint
204        .strip_prefix("http://")
205        .ok_or_else(|| ReplicaQueryError::Query(format!("unsupported endpoint {endpoint}")))?;
206    let authority = rest.split('/').next().unwrap_or(rest);
207    let (host, port) = authority
208        .rsplit_once(':')
209        .ok_or_else(|| ReplicaQueryError::Query(format!("missing port in {endpoint}")))?;
210    let port = port
211        .parse::<u16>()
212        .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
213    Ok((host.to_string(), port))
214}
215
216// Split a simple HTTP response and reject non-2xx status codes.
217fn split_http_body(response: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
218    let marker = b"\r\n\r\n";
219    let Some(index) = response
220        .windows(marker.len())
221        .position(|window| window == marker)
222    else {
223        return Err(ReplicaQueryError::Query(
224            "malformed HTTP response".to_string(),
225        ));
226    };
227    let header = String::from_utf8_lossy(&response[..index]);
228    let status_ok = header
229        .lines()
230        .next()
231        .is_some_and(|status| status.contains(" 2"));
232    if !status_ok {
233        return Err(ReplicaQueryError::Query(header.to_string()));
234    }
235    Ok(response[index + marker.len()..].to_vec())
236}
237
238///
239/// QueryEnvelope
240///
241
242#[derive(Serialize)]
243struct QueryEnvelope<'a> {
244    content: QueryContent<'a>,
245}
246
247///
248/// QueryContent
249///
250
251#[derive(Serialize)]
252struct QueryContent<'a> {
253    request_type: &'static str,
254    #[serde(with = "serde_bytes")]
255    canister_id: &'a [u8],
256    method_name: &'a str,
257    #[serde(with = "serde_bytes")]
258    arg: &'a [u8],
259    #[serde(with = "serde_bytes")]
260    sender: &'a [u8],
261    ingress_expiry: u64,
262}
263
264///
265/// QueryResponse
266///
267
268#[derive(Deserialize)]
269struct QueryResponse {
270    status: String,
271    reply: Option<QueryReply>,
272    reject_code: Option<u64>,
273    reject_message: Option<String>,
274}
275
276///
277/// QueryReply
278///
279
280#[derive(Deserialize)]
281struct QueryReply {
282    #[serde(with = "serde_bytes")]
283    arg: Vec<u8>,
284}
285
286///
287/// SubnetRegistryResponseWire
288///
289
290#[derive(CandidType, Deserialize)]
291struct SubnetRegistryResponseWire(Vec<SubnetRegistryEntryWire>);
292
293impl SubnetRegistryResponseWire {
294    // Convert direct Candid query output into the DFX JSON shape the discovery parser accepts.
295    fn to_dfx_json(&self) -> serde_json::Value {
296        serde_json::json!({
297            "Ok": self.0.iter().map(SubnetRegistryEntryWire::to_dfx_json).collect::<Vec<_>>()
298        })
299    }
300}
301
302///
303/// SubnetRegistryEntryWire
304///
305
306#[derive(CandidType, Deserialize)]
307struct SubnetRegistryEntryWire {
308    pid: Principal,
309    role: String,
310    record: CanisterInfoWire,
311}
312
313impl SubnetRegistryEntryWire {
314    // Convert one registry entry into the DFX JSON shape used by existing list rendering.
315    fn to_dfx_json(&self) -> serde_json::Value {
316        serde_json::json!({
317            "pid": self.pid.to_text(),
318            "role": self.role,
319            "record": self.record.to_dfx_json(),
320        })
321    }
322}
323
324///
325/// CanisterInfoWire
326///
327
328#[derive(CandidType, Deserialize)]
329struct CanisterInfoWire {
330    pid: Principal,
331    role: String,
332    parent_pid: Option<Principal>,
333    module_hash: Option<Vec<u8>>,
334    created_at: u64,
335}
336
337impl CanisterInfoWire {
338    // Convert one canister info record into a DFX-like JSON object.
339    fn to_dfx_json(&self) -> serde_json::Value {
340        serde_json::json!({
341            "pid": self.pid.to_text(),
342            "role": self.role,
343            "parent_pid": self.parent_pid.as_ref().map(Principal::to_text),
344            "module_hash": self.module_hash,
345            "created_at": self.created_at.to_string(),
346        })
347    }
348}
349
350///
351/// CanicErrorWire
352///
353
354#[derive(CandidType, Deserialize)]
355struct CanicErrorWire {
356    code: ErrorCodeWire,
357    message: String,
358}
359
360impl fmt::Display for CanicErrorWire {
361    // Render a compact public API error from a direct local replica query.
362    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
363        write!(formatter, "{:?}: {}", self.code, self.message)
364    }
365}
366
367///
368/// ErrorCodeWire
369///
370
371#[derive(CandidType, Debug, Deserialize)]
372enum ErrorCodeWire {
373    Conflict,
374    Forbidden,
375    Internal,
376    InvalidInput,
377    InvariantViolation,
378    NotFound,
379    PolicyInstanceRequiresSingletonWithDirectory,
380    PolicyReplicaRequiresSingletonWithScaling,
381    PolicyRoleAlreadyRegistered,
382    PolicyShardRequiresSingletonWithSharding,
383    PolicySingletonAlreadyRegisteredUnderParent,
384    ResourceExhausted,
385    Unauthorized,
386    Unavailable,
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392
393    // Ensure readiness parsing accepts the common dfx JSON result shapes.
394    #[test]
395    fn parse_ready_json_value_accepts_nested_true_shapes() {
396        assert!(parse_ready_json_value(&serde_json::json!(true)));
397        assert!(parse_ready_json_value(&serde_json::json!({ "Ok": true })));
398        assert!(parse_ready_json_value(&serde_json::json!([{ "Ok": true }])));
399    }
400
401    // Ensure readiness parsing rejects false and non-boolean result shapes.
402    #[test]
403    fn parse_ready_json_value_rejects_false_shapes() {
404        assert!(!parse_ready_json_value(&serde_json::json!(false)));
405        assert!(!parse_ready_json_value(&serde_json::json!({ "Ok": false })));
406        assert!(!parse_ready_json_value(&serde_json::json!("true")));
407    }
408}