1use candid::{CandidType, Decode, Encode, Principal};
2use serde::{Deserialize, Serialize};
3use std::{
4 error::Error,
5 fmt,
6 io::{Read, Write},
7 net::TcpStream,
8 time::{SystemTime, UNIX_EPOCH},
9};
10
11#[derive(Debug)]
16pub enum ReplicaQueryError {
17 Io(std::io::Error),
18 Cbor(serde_cbor::Error),
19 Json(serde_json::Error),
20 Query(String),
21 Rejected { code: u64, message: String },
22}
23
24impl fmt::Display for ReplicaQueryError {
25 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
27 match self {
28 Self::Io(err) => write!(formatter, "{err}"),
29 Self::Cbor(err) => write!(formatter, "{err}"),
30 Self::Json(err) => write!(formatter, "{err}"),
31 Self::Query(message) => write!(formatter, "{message}"),
32 Self::Rejected { code, message } => {
33 write!(
34 formatter,
35 "local replica rejected query: code={code} message={message}"
36 )
37 }
38 }
39 }
40}
41
42impl Error for ReplicaQueryError {
43 fn source(&self) -> Option<&(dyn Error + 'static)> {
45 match self {
46 Self::Io(err) => Some(err),
47 Self::Cbor(err) => Some(err),
48 Self::Json(err) => Some(err),
49 Self::Query(_) | Self::Rejected { .. } => None,
50 }
51 }
52}
53
54impl From<std::io::Error> for ReplicaQueryError {
55 fn from(err: std::io::Error) -> Self {
57 Self::Io(err)
58 }
59}
60
61impl From<serde_cbor::Error> for ReplicaQueryError {
62 fn from(err: serde_cbor::Error) -> Self {
64 Self::Cbor(err)
65 }
66}
67
68impl From<serde_json::Error> for ReplicaQueryError {
69 fn from(err: serde_json::Error) -> Self {
71 Self::Json(err)
72 }
73}
74
75#[must_use]
77pub fn should_use_local_replica_query(network: Option<&str>) -> bool {
78 network.is_none_or(|network| network == "local" || network.starts_with("http://"))
79}
80
81pub fn query_ready(network: Option<&str>, canister: &str) -> Result<bool, ReplicaQueryError> {
83 let bytes = local_query(network, canister, "canic_ready")?;
84 Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
85}
86
87#[must_use]
89pub fn parse_ready_json_value(data: &serde_json::Value) -> bool {
90 match data {
91 serde_json::Value::Bool(value) => *value,
92 serde_json::Value::String(value) => value.trim() == "(true)",
93 serde_json::Value::Array(values) => values.iter().any(parse_ready_json_value),
94 serde_json::Value::Object(map) => map.values().any(parse_ready_json_value),
95 _ => false,
96 }
97}
98
99pub fn query_subnet_registry_json(
101 network: Option<&str>,
102 root: &str,
103) -> Result<String, ReplicaQueryError> {
104 let bytes = local_query(network, root, "canic_subnet_registry")?;
105 let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
106 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
107 let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
108 serde_json::to_string(&response.to_cli_json()).map_err(ReplicaQueryError::from)
109}
110
111fn local_query(
113 network: Option<&str>,
114 canister: &str,
115 method: &str,
116) -> Result<Vec<u8>, ReplicaQueryError> {
117 let canister_id =
118 Principal::from_text(canister).map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
119 let arg = Encode!().map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
120 let sender = Principal::anonymous();
121 let envelope = QueryEnvelope {
122 content: QueryContent {
123 request_type: "query",
124 canister_id: canister_id.as_slice(),
125 method_name: method,
126 arg: &arg,
127 sender: sender.as_slice(),
128 ingress_expiry: ingress_expiry_nanos()?,
129 },
130 };
131 let body = serde_cbor::to_vec(&envelope)?;
132 let endpoint = local_replica_endpoint(network);
133 let response = post_cbor(
134 &endpoint,
135 &format!("/api/v2/canister/{canister}/query"),
136 &body,
137 )?;
138 let query_response = serde_cbor::from_slice::<QueryResponse>(&response)?;
139
140 if query_response.status == "replied" {
141 return query_response
142 .reply
143 .map(|reply| reply.arg)
144 .ok_or_else(|| ReplicaQueryError::Query("missing query reply".to_string()));
145 }
146
147 Err(ReplicaQueryError::Rejected {
148 code: query_response.reject_code.unwrap_or_default(),
149 message: query_response.reject_message.unwrap_or_default(),
150 })
151}
152
153fn local_replica_endpoint(network: Option<&str>) -> String {
155 if let Some(network) = network.filter(|network| network.starts_with("http://")) {
156 return network.trim_end_matches('/').to_string();
157 }
158
159 "http://127.0.0.1:8000".to_string()
160}
161
162fn ingress_expiry_nanos() -> Result<u64, ReplicaQueryError> {
164 let now = SystemTime::now()
165 .duration_since(UNIX_EPOCH)
166 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
167 let expiry = now
168 .as_nanos()
169 .saturating_add(5 * 60 * 1_000_000_000)
170 .min(u128::from(u64::MAX));
171 u64::try_from(expiry).map_err(|err| ReplicaQueryError::Query(err.to_string()))
172}
173
174fn post_cbor(endpoint: &str, path: &str, body: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
176 let (host, port) = parse_http_endpoint(endpoint)?;
177 let mut stream = TcpStream::connect((host.as_str(), port))?;
178 let request = format!(
179 "POST {path} HTTP/1.1\r\nHost: {host}:{port}\r\nContent-Type: application/cbor\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
180 body.len()
181 );
182 stream.write_all(request.as_bytes())?;
183 stream.write_all(body)?;
184
185 let mut response = Vec::new();
186 stream.read_to_end(&mut response)?;
187 split_http_body(&response)
188}
189
190fn parse_http_endpoint(endpoint: &str) -> Result<(String, u16), ReplicaQueryError> {
192 let rest = endpoint
193 .strip_prefix("http://")
194 .ok_or_else(|| ReplicaQueryError::Query(format!("unsupported endpoint {endpoint}")))?;
195 let authority = rest.split('/').next().unwrap_or(rest);
196 let (host, port) = authority
197 .rsplit_once(':')
198 .ok_or_else(|| ReplicaQueryError::Query(format!("missing port in {endpoint}")))?;
199 let port = port
200 .parse::<u16>()
201 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
202 Ok((host.to_string(), port))
203}
204
205fn split_http_body(response: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
207 let marker = b"\r\n\r\n";
208 let Some(index) = response
209 .windows(marker.len())
210 .position(|window| window == marker)
211 else {
212 return Err(ReplicaQueryError::Query(
213 "malformed HTTP response".to_string(),
214 ));
215 };
216 let header = String::from_utf8_lossy(&response[..index]);
217 let status_ok = header
218 .lines()
219 .next()
220 .is_some_and(|status| status.contains(" 2"));
221 if !status_ok {
222 return Err(ReplicaQueryError::Query(header.to_string()));
223 }
224 Ok(response[index + marker.len()..].to_vec())
225}
226
227#[derive(Serialize)]
232struct QueryEnvelope<'a> {
233 content: QueryContent<'a>,
234}
235
236#[derive(Serialize)]
241struct QueryContent<'a> {
242 request_type: &'static str,
243 #[serde(with = "serde_bytes")]
244 canister_id: &'a [u8],
245 method_name: &'a str,
246 #[serde(with = "serde_bytes")]
247 arg: &'a [u8],
248 #[serde(with = "serde_bytes")]
249 sender: &'a [u8],
250 ingress_expiry: u64,
251}
252
253#[derive(Deserialize)]
258struct QueryResponse {
259 status: String,
260 reply: Option<QueryReply>,
261 reject_code: Option<u64>,
262 reject_message: Option<String>,
263}
264
265#[derive(Deserialize)]
270struct QueryReply {
271 #[serde(with = "serde_bytes")]
272 arg: Vec<u8>,
273}
274
275#[derive(CandidType, Deserialize)]
280struct SubnetRegistryResponseWire(Vec<SubnetRegistryEntryWire>);
281
282impl SubnetRegistryResponseWire {
283 fn to_cli_json(&self) -> serde_json::Value {
285 serde_json::json!({
286 "Ok": self.0.iter().map(SubnetRegistryEntryWire::to_cli_json).collect::<Vec<_>>()
287 })
288 }
289}
290
291#[derive(CandidType, Deserialize)]
296struct SubnetRegistryEntryWire {
297 pid: Principal,
298 role: String,
299 record: CanisterInfoWire,
300}
301
302impl SubnetRegistryEntryWire {
303 fn to_cli_json(&self) -> serde_json::Value {
305 serde_json::json!({
306 "pid": self.pid.to_text(),
307 "role": self.role,
308 "record": self.record.to_cli_json(),
309 })
310 }
311}
312
313#[derive(CandidType, Deserialize)]
318struct CanisterInfoWire {
319 pid: Principal,
320 role: String,
321 parent_pid: Option<Principal>,
322 module_hash: Option<Vec<u8>>,
323 created_at: u64,
324}
325
326impl CanisterInfoWire {
327 fn to_cli_json(&self) -> serde_json::Value {
329 serde_json::json!({
330 "pid": self.pid.to_text(),
331 "role": self.role,
332 "parent_pid": self.parent_pid.as_ref().map(Principal::to_text),
333 "module_hash": self.module_hash,
334 "created_at": self.created_at.to_string(),
335 })
336 }
337}
338
339#[derive(CandidType, Deserialize)]
344struct CanicErrorWire {
345 code: ErrorCodeWire,
346 message: String,
347}
348
349impl fmt::Display for CanicErrorWire {
350 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
352 write!(formatter, "{:?}: {}", self.code, self.message)
353 }
354}
355
356#[derive(CandidType, Debug, Deserialize)]
361enum ErrorCodeWire {
362 Conflict,
363 Forbidden,
364 Internal,
365 InvalidInput,
366 InvariantViolation,
367 NotFound,
368 PolicyInstanceRequiresSingletonWithDirectory,
369 PolicyReplicaRequiresSingletonWithScaling,
370 PolicyRoleAlreadyRegistered,
371 PolicyShardRequiresSingletonWithSharding,
372 PolicySingletonAlreadyRegisteredUnderParent,
373 ResourceExhausted,
374 Unauthorized,
375 Unavailable,
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381
382 #[test]
384 fn parse_ready_json_value_accepts_nested_true_shapes() {
385 assert!(parse_ready_json_value(&serde_json::json!(true)));
386 assert!(parse_ready_json_value(&serde_json::json!({ "Ok": true })));
387 assert!(parse_ready_json_value(&serde_json::json!([{ "Ok": true }])));
388 assert!(parse_ready_json_value(&serde_json::json!({
389 "response_candid": "(true)"
390 })));
391 }
392
393 #[test]
395 fn parse_ready_json_value_rejects_false_shapes() {
396 assert!(!parse_ready_json_value(&serde_json::json!(false)));
397 assert!(!parse_ready_json_value(&serde_json::json!({ "Ok": false })));
398 assert!(!parse_ready_json_value(&serde_json::json!("true")));
399 }
400
401 #[test]
403 fn local_replica_endpoint_defaults_to_icp_cli_port() {
404 assert_eq!(local_replica_endpoint(None), "http://127.0.0.1:8000");
405 assert_eq!(
406 local_replica_endpoint(Some("http://127.0.0.1:9000/")),
407 "http://127.0.0.1:9000"
408 );
409 }
410}