1use candid::{CandidType, Decode, Encode, Principal};
2use serde::{Deserialize, Serialize};
3use std::{
4 error::Error,
5 fmt,
6 io::{Read, Write},
7 net::TcpStream,
8 process::Command,
9 time::{SystemTime, UNIX_EPOCH},
10};
11
12use crate::dfx::run_output;
13
14#[derive(Debug)]
19pub enum ReplicaQueryError {
20 Io(std::io::Error),
21 Cbor(serde_cbor::Error),
22 Json(serde_json::Error),
23 Query(String),
24 Rejected { code: u64, message: String },
25}
26
27impl fmt::Display for ReplicaQueryError {
28 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
30 match self {
31 Self::Io(err) => write!(formatter, "{err}"),
32 Self::Cbor(err) => write!(formatter, "{err}"),
33 Self::Json(err) => write!(formatter, "{err}"),
34 Self::Query(message) => write!(formatter, "{message}"),
35 Self::Rejected { code, message } => {
36 write!(
37 formatter,
38 "local replica rejected query: code={code} message={message}"
39 )
40 }
41 }
42 }
43}
44
45impl Error for ReplicaQueryError {
46 fn source(&self) -> Option<&(dyn Error + 'static)> {
48 match self {
49 Self::Io(err) => Some(err),
50 Self::Cbor(err) => Some(err),
51 Self::Json(err) => Some(err),
52 Self::Query(_) | Self::Rejected { .. } => None,
53 }
54 }
55}
56
57impl From<std::io::Error> for ReplicaQueryError {
58 fn from(err: std::io::Error) -> Self {
60 Self::Io(err)
61 }
62}
63
64impl From<serde_cbor::Error> for ReplicaQueryError {
65 fn from(err: serde_cbor::Error) -> Self {
67 Self::Cbor(err)
68 }
69}
70
71impl From<serde_json::Error> for ReplicaQueryError {
72 fn from(err: serde_json::Error) -> Self {
74 Self::Json(err)
75 }
76}
77
78#[must_use]
80pub fn should_use_local_replica_query(network: Option<&str>) -> bool {
81 network.is_none_or(|network| network == "local" || network.starts_with("http://"))
82}
83
84pub fn query_ready(
86 dfx: &str,
87 network: Option<&str>,
88 canister: &str,
89) -> Result<bool, ReplicaQueryError> {
90 let bytes = local_query(dfx, network, canister, "canic_ready")?;
91 Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
92}
93
94pub fn query_subnet_registry_json(
96 dfx: &str,
97 network: Option<&str>,
98 root: &str,
99) -> Result<String, ReplicaQueryError> {
100 let bytes = local_query(dfx, network, root, "canic_subnet_registry")?;
101 let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
102 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
103 let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
104 serde_json::to_string(&response.to_dfx_json()).map_err(ReplicaQueryError::from)
105}
106
107fn local_query(
109 dfx: &str,
110 network: Option<&str>,
111 canister: &str,
112 method: &str,
113) -> Result<Vec<u8>, ReplicaQueryError> {
114 let canister_id =
115 Principal::from_text(canister).map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
116 let arg = Encode!().map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
117 let sender = Principal::anonymous();
118 let envelope = QueryEnvelope {
119 content: QueryContent {
120 request_type: "query",
121 canister_id: canister_id.as_slice(),
122 method_name: method,
123 arg: &arg,
124 sender: sender.as_slice(),
125 ingress_expiry: ingress_expiry_nanos()?,
126 },
127 };
128 let body = serde_cbor::to_vec(&envelope)?;
129 let endpoint = local_replica_endpoint(dfx, network);
130 let response = post_cbor(
131 &endpoint,
132 &format!("/api/v2/canister/{canister}/query"),
133 &body,
134 )?;
135 let query_response = serde_cbor::from_slice::<QueryResponse>(&response)?;
136
137 if query_response.status == "replied" {
138 return query_response
139 .reply
140 .map(|reply| reply.arg)
141 .ok_or_else(|| ReplicaQueryError::Query("missing query reply".to_string()));
142 }
143
144 Err(ReplicaQueryError::Rejected {
145 code: query_response.reject_code.unwrap_or_default(),
146 message: query_response.reject_message.unwrap_or_default(),
147 })
148}
149
150fn local_replica_endpoint(dfx: &str, network: Option<&str>) -> String {
152 if let Some(network) = network.filter(|network| network.starts_with("http://")) {
153 return network.trim_end_matches('/').to_string();
154 }
155
156 let mut command = Command::new(dfx);
157 command.args(["info", "webserver-port"]);
158 let port = run_output(&mut command).unwrap_or_else(|_| "4943".to_string());
159 format!("http://127.0.0.1:{port}")
160}
161
162fn ingress_expiry_nanos() -> Result<u64, ReplicaQueryError> {
164 let now = SystemTime::now()
165 .duration_since(UNIX_EPOCH)
166 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
167 let expiry = now
168 .as_nanos()
169 .saturating_add(5 * 60 * 1_000_000_000)
170 .min(u128::from(u64::MAX));
171 u64::try_from(expiry).map_err(|err| ReplicaQueryError::Query(err.to_string()))
172}
173
174fn post_cbor(endpoint: &str, path: &str, body: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
176 let (host, port) = parse_http_endpoint(endpoint)?;
177 let mut stream = TcpStream::connect((host.as_str(), port))?;
178 let request = format!(
179 "POST {path} HTTP/1.1\r\nHost: {host}:{port}\r\nContent-Type: application/cbor\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
180 body.len()
181 );
182 stream.write_all(request.as_bytes())?;
183 stream.write_all(body)?;
184
185 let mut response = Vec::new();
186 stream.read_to_end(&mut response)?;
187 split_http_body(&response)
188}
189
190fn parse_http_endpoint(endpoint: &str) -> Result<(String, u16), ReplicaQueryError> {
192 let rest = endpoint
193 .strip_prefix("http://")
194 .ok_or_else(|| ReplicaQueryError::Query(format!("unsupported endpoint {endpoint}")))?;
195 let authority = rest.split('/').next().unwrap_or(rest);
196 let (host, port) = authority
197 .rsplit_once(':')
198 .ok_or_else(|| ReplicaQueryError::Query(format!("missing port in {endpoint}")))?;
199 let port = port
200 .parse::<u16>()
201 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
202 Ok((host.to_string(), port))
203}
204
205fn split_http_body(response: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
207 let marker = b"\r\n\r\n";
208 let Some(index) = response
209 .windows(marker.len())
210 .position(|window| window == marker)
211 else {
212 return Err(ReplicaQueryError::Query(
213 "malformed HTTP response".to_string(),
214 ));
215 };
216 let header = String::from_utf8_lossy(&response[..index]);
217 let status_ok = header
218 .lines()
219 .next()
220 .is_some_and(|status| status.contains(" 2"));
221 if !status_ok {
222 return Err(ReplicaQueryError::Query(header.to_string()));
223 }
224 Ok(response[index + marker.len()..].to_vec())
225}
226
227#[derive(Serialize)]
232struct QueryEnvelope<'a> {
233 content: QueryContent<'a>,
234}
235
236#[derive(Serialize)]
241struct QueryContent<'a> {
242 request_type: &'static str,
243 #[serde(with = "serde_bytes")]
244 canister_id: &'a [u8],
245 method_name: &'a str,
246 #[serde(with = "serde_bytes")]
247 arg: &'a [u8],
248 #[serde(with = "serde_bytes")]
249 sender: &'a [u8],
250 ingress_expiry: u64,
251}
252
253#[derive(Deserialize)]
258struct QueryResponse {
259 status: String,
260 reply: Option<QueryReply>,
261 reject_code: Option<u64>,
262 reject_message: Option<String>,
263}
264
265#[derive(Deserialize)]
270struct QueryReply {
271 #[serde(with = "serde_bytes")]
272 arg: Vec<u8>,
273}
274
275#[derive(CandidType, Deserialize)]
280struct SubnetRegistryResponseWire(Vec<SubnetRegistryEntryWire>);
281
282impl SubnetRegistryResponseWire {
283 fn to_dfx_json(&self) -> serde_json::Value {
285 serde_json::json!({
286 "Ok": self.0.iter().map(SubnetRegistryEntryWire::to_dfx_json).collect::<Vec<_>>()
287 })
288 }
289}
290
291#[derive(CandidType, Deserialize)]
296struct SubnetRegistryEntryWire {
297 pid: Principal,
298 role: String,
299 record: CanisterInfoWire,
300}
301
302impl SubnetRegistryEntryWire {
303 fn to_dfx_json(&self) -> serde_json::Value {
305 serde_json::json!({
306 "pid": self.pid.to_text(),
307 "role": self.role,
308 "record": self.record.to_dfx_json(),
309 })
310 }
311}
312
313#[derive(CandidType, Deserialize)]
318struct CanisterInfoWire {
319 pid: Principal,
320 role: String,
321 parent_pid: Option<Principal>,
322 module_hash: Option<Vec<u8>>,
323 created_at: u64,
324}
325
326impl CanisterInfoWire {
327 fn to_dfx_json(&self) -> serde_json::Value {
329 serde_json::json!({
330 "pid": self.pid.to_text(),
331 "role": self.role,
332 "parent_pid": self.parent_pid.as_ref().map(Principal::to_text),
333 "module_hash": self.module_hash,
334 "created_at": self.created_at.to_string(),
335 })
336 }
337}
338
339#[derive(CandidType, Deserialize)]
344struct CanicErrorWire {
345 code: ErrorCodeWire,
346 message: String,
347}
348
349impl fmt::Display for CanicErrorWire {
350 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
352 write!(formatter, "{:?}: {}", self.code, self.message)
353 }
354}
355
356#[derive(CandidType, Debug, Deserialize)]
361enum ErrorCodeWire {
362 Conflict,
363 Forbidden,
364 Internal,
365 InvalidInput,
366 InvariantViolation,
367 NotFound,
368 PolicyInstanceRequiresSingletonWithDirectory,
369 PolicyReplicaRequiresSingletonWithScaling,
370 PolicyRoleAlreadyRegistered,
371 PolicyShardRequiresSingletonWithSharding,
372 PolicySingletonAlreadyRegisteredUnderParent,
373 ResourceExhausted,
374 Unauthorized,
375 Unavailable,
376}