1use candid::{CandidType, Decode, Encode, Principal};
2use serde::{Deserialize, Serialize};
3use std::{
4 error::Error,
5 fmt,
6 io::{Read, Write},
7 net::TcpStream,
8 process::Command,
9 time::{SystemTime, UNIX_EPOCH},
10};
11
12use crate::dfx::run_output;
13
14#[derive(Debug)]
19pub enum ReplicaQueryError {
20 Io(std::io::Error),
21 Cbor(serde_cbor::Error),
22 Json(serde_json::Error),
23 Query(String),
24 Rejected { code: u64, message: String },
25}
26
27impl fmt::Display for ReplicaQueryError {
28 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
30 match self {
31 Self::Io(err) => write!(formatter, "{err}"),
32 Self::Cbor(err) => write!(formatter, "{err}"),
33 Self::Json(err) => write!(formatter, "{err}"),
34 Self::Query(message) => write!(formatter, "{message}"),
35 Self::Rejected { code, message } => {
36 write!(
37 formatter,
38 "local replica rejected query: code={code} message={message}"
39 )
40 }
41 }
42 }
43}
44
45impl Error for ReplicaQueryError {
46 fn source(&self) -> Option<&(dyn Error + 'static)> {
48 match self {
49 Self::Io(err) => Some(err),
50 Self::Cbor(err) => Some(err),
51 Self::Json(err) => Some(err),
52 Self::Query(_) | Self::Rejected { .. } => None,
53 }
54 }
55}
56
57impl From<std::io::Error> for ReplicaQueryError {
58 fn from(err: std::io::Error) -> Self {
60 Self::Io(err)
61 }
62}
63
64impl From<serde_cbor::Error> for ReplicaQueryError {
65 fn from(err: serde_cbor::Error) -> Self {
67 Self::Cbor(err)
68 }
69}
70
71impl From<serde_json::Error> for ReplicaQueryError {
72 fn from(err: serde_json::Error) -> Self {
74 Self::Json(err)
75 }
76}
77
78#[must_use]
80pub fn should_use_local_replica_query(network: Option<&str>) -> bool {
81 network.is_none_or(|network| network == "local" || network.starts_with("http://"))
82}
83
84pub fn query_ready(
86 dfx: &str,
87 network: Option<&str>,
88 canister: &str,
89) -> Result<bool, ReplicaQueryError> {
90 let bytes = local_query(dfx, network, canister, "canic_ready")?;
91 Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
92}
93
94#[must_use]
96pub fn parse_ready_json_value(data: &serde_json::Value) -> bool {
97 match data {
98 serde_json::Value::Bool(value) => *value,
99 serde_json::Value::Array(values) => values.iter().any(parse_ready_json_value),
100 serde_json::Value::Object(map) => map.values().any(parse_ready_json_value),
101 _ => false,
102 }
103}
104
105pub fn query_subnet_registry_json(
107 dfx: &str,
108 network: Option<&str>,
109 root: &str,
110) -> Result<String, ReplicaQueryError> {
111 let bytes = local_query(dfx, network, root, "canic_subnet_registry")?;
112 let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
113 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
114 let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
115 serde_json::to_string(&response.to_dfx_json()).map_err(ReplicaQueryError::from)
116}
117
118fn local_query(
120 dfx: &str,
121 network: Option<&str>,
122 canister: &str,
123 method: &str,
124) -> Result<Vec<u8>, ReplicaQueryError> {
125 let canister_id =
126 Principal::from_text(canister).map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
127 let arg = Encode!().map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
128 let sender = Principal::anonymous();
129 let envelope = QueryEnvelope {
130 content: QueryContent {
131 request_type: "query",
132 canister_id: canister_id.as_slice(),
133 method_name: method,
134 arg: &arg,
135 sender: sender.as_slice(),
136 ingress_expiry: ingress_expiry_nanos()?,
137 },
138 };
139 let body = serde_cbor::to_vec(&envelope)?;
140 let endpoint = local_replica_endpoint(dfx, network);
141 let response = post_cbor(
142 &endpoint,
143 &format!("/api/v2/canister/{canister}/query"),
144 &body,
145 )?;
146 let query_response = serde_cbor::from_slice::<QueryResponse>(&response)?;
147
148 if query_response.status == "replied" {
149 return query_response
150 .reply
151 .map(|reply| reply.arg)
152 .ok_or_else(|| ReplicaQueryError::Query("missing query reply".to_string()));
153 }
154
155 Err(ReplicaQueryError::Rejected {
156 code: query_response.reject_code.unwrap_or_default(),
157 message: query_response.reject_message.unwrap_or_default(),
158 })
159}
160
161fn local_replica_endpoint(dfx: &str, network: Option<&str>) -> String {
163 if let Some(network) = network.filter(|network| network.starts_with("http://")) {
164 return network.trim_end_matches('/').to_string();
165 }
166
167 let mut command = Command::new(dfx);
168 command.args(["info", "webserver-port"]);
169 let port = run_output(&mut command).unwrap_or_else(|_| "4943".to_string());
170 format!("http://127.0.0.1:{port}")
171}
172
173fn ingress_expiry_nanos() -> Result<u64, ReplicaQueryError> {
175 let now = SystemTime::now()
176 .duration_since(UNIX_EPOCH)
177 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
178 let expiry = now
179 .as_nanos()
180 .saturating_add(5 * 60 * 1_000_000_000)
181 .min(u128::from(u64::MAX));
182 u64::try_from(expiry).map_err(|err| ReplicaQueryError::Query(err.to_string()))
183}
184
185fn post_cbor(endpoint: &str, path: &str, body: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
187 let (host, port) = parse_http_endpoint(endpoint)?;
188 let mut stream = TcpStream::connect((host.as_str(), port))?;
189 let request = format!(
190 "POST {path} HTTP/1.1\r\nHost: {host}:{port}\r\nContent-Type: application/cbor\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
191 body.len()
192 );
193 stream.write_all(request.as_bytes())?;
194 stream.write_all(body)?;
195
196 let mut response = Vec::new();
197 stream.read_to_end(&mut response)?;
198 split_http_body(&response)
199}
200
201fn parse_http_endpoint(endpoint: &str) -> Result<(String, u16), ReplicaQueryError> {
203 let rest = endpoint
204 .strip_prefix("http://")
205 .ok_or_else(|| ReplicaQueryError::Query(format!("unsupported endpoint {endpoint}")))?;
206 let authority = rest.split('/').next().unwrap_or(rest);
207 let (host, port) = authority
208 .rsplit_once(':')
209 .ok_or_else(|| ReplicaQueryError::Query(format!("missing port in {endpoint}")))?;
210 let port = port
211 .parse::<u16>()
212 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
213 Ok((host.to_string(), port))
214}
215
216fn split_http_body(response: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
218 let marker = b"\r\n\r\n";
219 let Some(index) = response
220 .windows(marker.len())
221 .position(|window| window == marker)
222 else {
223 return Err(ReplicaQueryError::Query(
224 "malformed HTTP response".to_string(),
225 ));
226 };
227 let header = String::from_utf8_lossy(&response[..index]);
228 let status_ok = header
229 .lines()
230 .next()
231 .is_some_and(|status| status.contains(" 2"));
232 if !status_ok {
233 return Err(ReplicaQueryError::Query(header.to_string()));
234 }
235 Ok(response[index + marker.len()..].to_vec())
236}
237
238#[derive(Serialize)]
243struct QueryEnvelope<'a> {
244 content: QueryContent<'a>,
245}
246
247#[derive(Serialize)]
252struct QueryContent<'a> {
253 request_type: &'static str,
254 #[serde(with = "serde_bytes")]
255 canister_id: &'a [u8],
256 method_name: &'a str,
257 #[serde(with = "serde_bytes")]
258 arg: &'a [u8],
259 #[serde(with = "serde_bytes")]
260 sender: &'a [u8],
261 ingress_expiry: u64,
262}
263
264#[derive(Deserialize)]
269struct QueryResponse {
270 status: String,
271 reply: Option<QueryReply>,
272 reject_code: Option<u64>,
273 reject_message: Option<String>,
274}
275
276#[derive(Deserialize)]
281struct QueryReply {
282 #[serde(with = "serde_bytes")]
283 arg: Vec<u8>,
284}
285
286#[derive(CandidType, Deserialize)]
291struct SubnetRegistryResponseWire(Vec<SubnetRegistryEntryWire>);
292
293impl SubnetRegistryResponseWire {
294 fn to_dfx_json(&self) -> serde_json::Value {
296 serde_json::json!({
297 "Ok": self.0.iter().map(SubnetRegistryEntryWire::to_dfx_json).collect::<Vec<_>>()
298 })
299 }
300}
301
302#[derive(CandidType, Deserialize)]
307struct SubnetRegistryEntryWire {
308 pid: Principal,
309 role: String,
310 record: CanisterInfoWire,
311}
312
313impl SubnetRegistryEntryWire {
314 fn to_dfx_json(&self) -> serde_json::Value {
316 serde_json::json!({
317 "pid": self.pid.to_text(),
318 "role": self.role,
319 "record": self.record.to_dfx_json(),
320 })
321 }
322}
323
324#[derive(CandidType, Deserialize)]
329struct CanisterInfoWire {
330 pid: Principal,
331 role: String,
332 parent_pid: Option<Principal>,
333 module_hash: Option<Vec<u8>>,
334 created_at: u64,
335}
336
337impl CanisterInfoWire {
338 fn to_dfx_json(&self) -> serde_json::Value {
340 serde_json::json!({
341 "pid": self.pid.to_text(),
342 "role": self.role,
343 "parent_pid": self.parent_pid.as_ref().map(Principal::to_text),
344 "module_hash": self.module_hash,
345 "created_at": self.created_at.to_string(),
346 })
347 }
348}
349
350#[derive(CandidType, Deserialize)]
355struct CanicErrorWire {
356 code: ErrorCodeWire,
357 message: String,
358}
359
360impl fmt::Display for CanicErrorWire {
361 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
363 write!(formatter, "{:?}: {}", self.code, self.message)
364 }
365}
366
367#[derive(CandidType, Debug, Deserialize)]
372enum ErrorCodeWire {
373 Conflict,
374 Forbidden,
375 Internal,
376 InvalidInput,
377 InvariantViolation,
378 NotFound,
379 PolicyInstanceRequiresSingletonWithDirectory,
380 PolicyReplicaRequiresSingletonWithScaling,
381 PolicyRoleAlreadyRegistered,
382 PolicyShardRequiresSingletonWithSharding,
383 PolicySingletonAlreadyRegisteredUnderParent,
384 ResourceExhausted,
385 Unauthorized,
386 Unavailable,
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392
393 #[test]
395 fn parse_ready_json_value_accepts_nested_true_shapes() {
396 assert!(parse_ready_json_value(&serde_json::json!(true)));
397 assert!(parse_ready_json_value(&serde_json::json!({ "Ok": true })));
398 assert!(parse_ready_json_value(&serde_json::json!([{ "Ok": true }])));
399 }
400
401 #[test]
403 fn parse_ready_json_value_rejects_false_shapes() {
404 assert!(!parse_ready_json_value(&serde_json::json!(false)));
405 assert!(!parse_ready_json_value(&serde_json::json!({ "Ok": false })));
406 assert!(!parse_ready_json_value(&serde_json::json!("true")));
407 }
408}