1use crate::icp_config::{
2 DEFAULT_LOCAL_GATEWAY_PORT, configured_local_gateway_port,
3 configured_local_gateway_port_from_root,
4};
5use candid::{CandidType, Decode, Encode, Principal};
6use serde::{Deserialize, Serialize};
7use std::{
8 error::Error,
9 fmt,
10 io::{Read, Write},
11 net::TcpStream,
12 path::Path,
13 time::{SystemTime, UNIX_EPOCH},
14};
15
16#[derive(Debug)]
21pub enum ReplicaQueryError {
22 Io(std::io::Error),
23 Cbor(serde_cbor::Error),
24 Json(serde_json::Error),
25 Query(String),
26 Rejected { code: u64, message: String },
27}
28
29impl fmt::Display for ReplicaQueryError {
30 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
32 match self {
33 Self::Io(err) => write!(formatter, "{err}"),
34 Self::Cbor(err) => write!(formatter, "{err}"),
35 Self::Json(err) => write!(formatter, "{err}"),
36 Self::Query(message) => write!(formatter, "{message}"),
37 Self::Rejected { code, message } => {
38 write!(
39 formatter,
40 "local replica rejected query: code={code} message={message}"
41 )
42 }
43 }
44 }
45}
46
47impl Error for ReplicaQueryError {
48 fn source(&self) -> Option<&(dyn Error + 'static)> {
50 match self {
51 Self::Io(err) => Some(err),
52 Self::Cbor(err) => Some(err),
53 Self::Json(err) => Some(err),
54 Self::Query(_) | Self::Rejected { .. } => None,
55 }
56 }
57}
58
59impl From<std::io::Error> for ReplicaQueryError {
60 fn from(err: std::io::Error) -> Self {
62 Self::Io(err)
63 }
64}
65
66impl From<serde_cbor::Error> for ReplicaQueryError {
67 fn from(err: serde_cbor::Error) -> Self {
69 Self::Cbor(err)
70 }
71}
72
73impl From<serde_json::Error> for ReplicaQueryError {
74 fn from(err: serde_json::Error) -> Self {
76 Self::Json(err)
77 }
78}
79
80#[must_use]
82pub fn should_use_local_replica_query(network: Option<&str>) -> bool {
83 network.is_none_or(|network| network == "local" || network.starts_with("http://"))
84}
85
86pub fn query_ready(network: Option<&str>, canister: &str) -> Result<bool, ReplicaQueryError> {
88 let bytes = local_query(network, canister, "canic_ready")?;
89 Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
90}
91
92pub fn query_ready_from_root(
94 network: Option<&str>,
95 canister: &str,
96 icp_root: &Path,
97) -> Result<bool, ReplicaQueryError> {
98 let bytes = local_query_from_root(network, canister, "canic_ready", icp_root)?;
99 Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
100}
101
102#[must_use]
104pub fn local_replica_status_reachable_from_root(network: Option<&str>, icp_root: &Path) -> bool {
105 get_http_status(&local_replica_endpoint_from_root(network, icp_root)).is_ok()
106}
107
108#[must_use]
110pub fn local_replica_endpoint_from_root(network: Option<&str>, icp_root: &Path) -> String {
111 local_replica_endpoint_with_port(
112 network,
113 configured_local_gateway_port_from_root(icp_root).ok(),
114 )
115}
116
117pub fn local_replica_root_key_from_root(
119 network: Option<&str>,
120 icp_root: &Path,
121) -> Result<Option<String>, ReplicaQueryError> {
122 let endpoint = local_replica_endpoint_from_root(network, icp_root);
123 let body = get_http_status(&endpoint)?;
124 let value = serde_json::from_slice::<serde_json::Value>(&body)?;
125 Ok(value
126 .get("root_key")
127 .and_then(serde_json::Value::as_str)
128 .filter(|root_key| !root_key.is_empty())
129 .map(str::to_string))
130}
131
132#[must_use]
134pub fn parse_ready_json_value(data: &serde_json::Value) -> bool {
135 match data {
136 serde_json::Value::Bool(value) => *value,
137 serde_json::Value::String(value) => value.trim() == "(true)",
138 serde_json::Value::Array(values) => values.iter().any(parse_ready_json_value),
139 serde_json::Value::Object(map) => map.values().any(parse_ready_json_value),
140 _ => false,
141 }
142}
143
144pub fn query_subnet_registry_json(
146 network: Option<&str>,
147 root: &str,
148) -> Result<String, ReplicaQueryError> {
149 let bytes = local_query(network, root, "canic_subnet_registry")?;
150 let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
151 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
152 let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
153 serde_json::to_string(&response.to_cli_json()).map_err(ReplicaQueryError::from)
154}
155
156pub fn query_subnet_registry_json_from_root(
158 network: Option<&str>,
159 root: &str,
160 icp_root: &Path,
161) -> Result<String, ReplicaQueryError> {
162 let bytes = local_query_from_root(network, root, "canic_subnet_registry", icp_root)?;
163 let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
164 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
165 let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
166 serde_json::to_string(&response.to_cli_json()).map_err(ReplicaQueryError::from)
167}
168
169fn local_query(
171 network: Option<&str>,
172 canister: &str,
173 method: &str,
174) -> Result<Vec<u8>, ReplicaQueryError> {
175 local_query_with_endpoint(network, canister, method, local_replica_endpoint(network))
176}
177
178fn local_query_from_root(
179 network: Option<&str>,
180 canister: &str,
181 method: &str,
182 icp_root: &Path,
183) -> Result<Vec<u8>, ReplicaQueryError> {
184 local_query_with_endpoint(
185 network,
186 canister,
187 method,
188 local_replica_endpoint_from_root(network, icp_root),
189 )
190}
191
192fn local_query_with_endpoint(
193 _network: Option<&str>,
194 canister: &str,
195 method: &str,
196 endpoint: String,
197) -> Result<Vec<u8>, ReplicaQueryError> {
198 let canister_id =
199 Principal::from_text(canister).map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
200 let arg = Encode!().map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
201 let sender = Principal::anonymous();
202 let envelope = QueryEnvelope {
203 content: QueryContent {
204 request_type: "query",
205 canister_id: canister_id.as_slice(),
206 method_name: method,
207 arg: &arg,
208 sender: sender.as_slice(),
209 ingress_expiry: ingress_expiry_nanos()?,
210 },
211 };
212 let body = serde_cbor::to_vec(&envelope)?;
213 let response = post_cbor(
214 &endpoint,
215 &format!("/api/v2/canister/{canister}/query"),
216 &body,
217 )?;
218 let query_response = serde_cbor::from_slice::<QueryResponse>(&response)?;
219
220 if query_response.status == "replied" {
221 return query_response
222 .reply
223 .map(|reply| reply.arg)
224 .ok_or_else(|| ReplicaQueryError::Query("missing query reply".to_string()));
225 }
226
227 Err(ReplicaQueryError::Rejected {
228 code: query_response.reject_code.unwrap_or_default(),
229 message: query_response.reject_message.unwrap_or_default(),
230 })
231}
232
233fn local_replica_endpoint(network: Option<&str>) -> String {
235 local_replica_endpoint_with_port(network, configured_local_gateway_port().ok())
236}
237
238fn local_replica_endpoint_with_port(network: Option<&str>, configured_port: Option<u16>) -> String {
240 if let Some(network) = network.filter(|network| network.starts_with("http://")) {
241 return network.trim_end_matches('/').to_string();
242 }
243
244 let port = configured_port.unwrap_or(DEFAULT_LOCAL_GATEWAY_PORT);
245 format!("http://127.0.0.1:{port}")
246}
247
248fn ingress_expiry_nanos() -> Result<u64, ReplicaQueryError> {
250 let now = SystemTime::now()
251 .duration_since(UNIX_EPOCH)
252 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
253 let expiry = now
254 .as_nanos()
255 .saturating_add(5 * 60 * 1_000_000_000)
256 .min(u128::from(u64::MAX));
257 u64::try_from(expiry).map_err(|err| ReplicaQueryError::Query(err.to_string()))
258}
259
260fn post_cbor(endpoint: &str, path: &str, body: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
262 let (host, port) = parse_http_endpoint(endpoint)?;
263 let mut stream = TcpStream::connect((host.as_str(), port))?;
264 let request = format!(
265 "POST {path} HTTP/1.1\r\nHost: {host}:{port}\r\nContent-Type: application/cbor\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
266 body.len()
267 );
268 stream.write_all(request.as_bytes())?;
269 stream.write_all(body)?;
270
271 let mut response = Vec::new();
272 stream.read_to_end(&mut response)?;
273 split_http_body(&response)
274}
275
276fn get_http_status(endpoint: &str) -> Result<Vec<u8>, ReplicaQueryError> {
277 let (host, port) = parse_http_endpoint(endpoint)?;
278 let mut stream = TcpStream::connect((host.as_str(), port))?;
279 let request =
280 format!("GET /api/v2/status HTTP/1.1\r\nHost: {host}:{port}\r\nConnection: close\r\n\r\n");
281 stream.write_all(request.as_bytes())?;
282
283 let mut response = Vec::new();
284 stream.read_to_end(&mut response)?;
285 split_http_body(&response)
286}
287
288fn parse_http_endpoint(endpoint: &str) -> Result<(String, u16), ReplicaQueryError> {
290 let rest = endpoint
291 .strip_prefix("http://")
292 .ok_or_else(|| ReplicaQueryError::Query(format!("unsupported endpoint {endpoint}")))?;
293 let authority = rest.split('/').next().unwrap_or(rest);
294 let (host, port) = authority
295 .rsplit_once(':')
296 .ok_or_else(|| ReplicaQueryError::Query(format!("missing port in {endpoint}")))?;
297 let port = port
298 .parse::<u16>()
299 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
300 Ok((host.to_string(), port))
301}
302
303fn split_http_body(response: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
305 let marker = b"\r\n\r\n";
306 let Some(index) = response
307 .windows(marker.len())
308 .position(|window| window == marker)
309 else {
310 return Err(ReplicaQueryError::Query(
311 "malformed HTTP response".to_string(),
312 ));
313 };
314 let header = String::from_utf8_lossy(&response[..index]);
315 let status_ok = header
316 .lines()
317 .next()
318 .is_some_and(|status| status.contains(" 2"));
319 if !status_ok {
320 return Err(ReplicaQueryError::Query(header.to_string()));
321 }
322 Ok(response[index + marker.len()..].to_vec())
323}
324
325#[derive(Serialize)]
330struct QueryEnvelope<'a> {
331 content: QueryContent<'a>,
332}
333
334#[derive(Serialize)]
339struct QueryContent<'a> {
340 request_type: &'static str,
341 #[serde(with = "serde_bytes")]
342 canister_id: &'a [u8],
343 method_name: &'a str,
344 #[serde(with = "serde_bytes")]
345 arg: &'a [u8],
346 #[serde(with = "serde_bytes")]
347 sender: &'a [u8],
348 ingress_expiry: u64,
349}
350
351#[derive(Deserialize)]
356struct QueryResponse {
357 status: String,
358 reply: Option<QueryReply>,
359 reject_code: Option<u64>,
360 reject_message: Option<String>,
361}
362
363#[derive(Deserialize)]
368struct QueryReply {
369 #[serde(with = "serde_bytes")]
370 arg: Vec<u8>,
371}
372
373#[derive(CandidType, Deserialize)]
378struct SubnetRegistryResponseWire(Vec<SubnetRegistryEntryWire>);
379
380impl SubnetRegistryResponseWire {
381 fn to_cli_json(&self) -> serde_json::Value {
383 serde_json::json!({
384 "Ok": self.0.iter().map(SubnetRegistryEntryWire::to_cli_json).collect::<Vec<_>>()
385 })
386 }
387}
388
389#[derive(CandidType, Deserialize)]
394struct SubnetRegistryEntryWire {
395 pid: Principal,
396 role: String,
397 record: CanisterInfoWire,
398}
399
400impl SubnetRegistryEntryWire {
401 fn to_cli_json(&self) -> serde_json::Value {
403 serde_json::json!({
404 "pid": self.pid.to_text(),
405 "role": self.role,
406 "record": self.record.to_cli_json(),
407 })
408 }
409}
410
411#[derive(CandidType, Deserialize)]
416struct CanisterInfoWire {
417 pid: Principal,
418 role: String,
419 parent_pid: Option<Principal>,
420 module_hash: Option<Vec<u8>>,
421 created_at: u64,
422}
423
424impl CanisterInfoWire {
425 fn to_cli_json(&self) -> serde_json::Value {
427 serde_json::json!({
428 "pid": self.pid.to_text(),
429 "role": self.role,
430 "parent_pid": self.parent_pid.as_ref().map(Principal::to_text),
431 "module_hash": self.module_hash,
432 "created_at": self.created_at.to_string(),
433 })
434 }
435}
436
437#[derive(CandidType, Deserialize)]
442struct CanicErrorWire {
443 code: ErrorCodeWire,
444 message: String,
445}
446
447impl fmt::Display for CanicErrorWire {
448 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
450 write!(formatter, "{:?}: {}", self.code, self.message)
451 }
452}
453
454#[derive(CandidType, Debug, Deserialize)]
459enum ErrorCodeWire {
460 Conflict,
461 Forbidden,
462 Internal,
463 InvalidInput,
464 InvariantViolation,
465 NotFound,
466 PolicyInstanceRequiresSingletonWithDirectory,
467 PolicyReplicaRequiresSingletonWithScaling,
468 PolicyRoleAlreadyRegistered,
469 PolicyShardRequiresSingletonWithSharding,
470 PolicySingletonAlreadyRegisteredUnderParent,
471 ResourceExhausted,
472 Unauthorized,
473 Unavailable,
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479
480 #[test]
482 fn parse_ready_json_value_accepts_nested_true_shapes() {
483 assert!(parse_ready_json_value(&serde_json::json!(true)));
484 assert!(parse_ready_json_value(&serde_json::json!({ "Ok": true })));
485 assert!(parse_ready_json_value(&serde_json::json!([{ "Ok": true }])));
486 assert!(parse_ready_json_value(&serde_json::json!({
487 "response_candid": "(true)"
488 })));
489 }
490
491 #[test]
493 fn parse_ready_json_value_rejects_false_shapes() {
494 assert!(!parse_ready_json_value(&serde_json::json!(false)));
495 assert!(!parse_ready_json_value(&serde_json::json!({ "Ok": false })));
496 assert!(!parse_ready_json_value(&serde_json::json!("true")));
497 }
498
499 #[test]
501 fn local_replica_endpoint_defaults_to_icp_cli_port() {
502 assert_eq!(
503 local_replica_endpoint_with_port(None, None),
504 "http://127.0.0.1:8000"
505 );
506 assert_eq!(
507 local_replica_endpoint_with_port(None, Some(8001)),
508 "http://127.0.0.1:8001"
509 );
510 assert_eq!(
511 local_replica_endpoint_with_port(Some("http://127.0.0.1:9000/"), Some(8001)),
512 "http://127.0.0.1:9000"
513 );
514 }
515}