1use crate::icp_config::{
2 DEFAULT_LOCAL_GATEWAY_PORT, configured_local_gateway_port,
3 configured_local_gateway_port_from_root,
4};
5use candid::{CandidType, Decode, Encode, Principal};
6use serde::{Deserialize, Serialize};
7use std::{
8 error::Error,
9 fmt,
10 io::{Read, Write},
11 net::TcpStream,
12 path::Path,
13 time::{SystemTime, UNIX_EPOCH},
14};
15
16#[derive(Debug)]
21pub enum ReplicaQueryError {
22 Io(std::io::Error),
23 Cbor(serde_cbor::Error),
24 Json(serde_json::Error),
25 Query(String),
26 Rejected { code: u64, message: String },
27}
28
29impl fmt::Display for ReplicaQueryError {
30 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
32 match self {
33 Self::Io(err) => write!(formatter, "{err}"),
34 Self::Cbor(err) => write!(formatter, "{err}"),
35 Self::Json(err) => write!(formatter, "{err}"),
36 Self::Query(message) => write!(formatter, "{message}"),
37 Self::Rejected { code, message } => {
38 write!(
39 formatter,
40 "local replica rejected query: code={code} message={message}"
41 )
42 }
43 }
44 }
45}
46
47impl Error for ReplicaQueryError {
48 fn source(&self) -> Option<&(dyn Error + 'static)> {
50 match self {
51 Self::Io(err) => Some(err),
52 Self::Cbor(err) => Some(err),
53 Self::Json(err) => Some(err),
54 Self::Query(_) | Self::Rejected { .. } => None,
55 }
56 }
57}
58
59impl From<std::io::Error> for ReplicaQueryError {
60 fn from(err: std::io::Error) -> Self {
62 Self::Io(err)
63 }
64}
65
66impl From<serde_cbor::Error> for ReplicaQueryError {
67 fn from(err: serde_cbor::Error) -> Self {
69 Self::Cbor(err)
70 }
71}
72
73impl From<serde_json::Error> for ReplicaQueryError {
74 fn from(err: serde_json::Error) -> Self {
76 Self::Json(err)
77 }
78}
79
80#[must_use]
82pub fn should_use_local_replica_query(network: Option<&str>) -> bool {
83 network.is_none_or(|network| network == "local" || network.starts_with("http://"))
84}
85
86pub fn query_ready(network: Option<&str>, canister: &str) -> Result<bool, ReplicaQueryError> {
88 let bytes = local_query(network, canister, "canic_ready")?;
89 Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
90}
91
92pub fn query_ready_from_root(
94 network: Option<&str>,
95 canister: &str,
96 icp_root: &Path,
97) -> Result<bool, ReplicaQueryError> {
98 let bytes = local_query_from_root(network, canister, "canic_ready", icp_root)?;
99 Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
100}
101
102#[must_use]
104pub fn local_replica_status_reachable_from_root(network: Option<&str>, icp_root: &Path) -> bool {
105 get_http_status(&local_replica_endpoint_from_root(network, icp_root)).is_ok()
106}
107
108#[must_use]
110pub fn local_replica_endpoint_from_root(network: Option<&str>, icp_root: &Path) -> String {
111 local_replica_endpoint_with_port(
112 network,
113 configured_local_gateway_port_from_root(icp_root).ok(),
114 )
115}
116
117pub fn local_replica_root_key_from_root(
119 network: Option<&str>,
120 icp_root: &Path,
121) -> Result<Option<String>, ReplicaQueryError> {
122 let endpoint = local_replica_endpoint_from_root(network, icp_root);
123 let body = get_http_status(&endpoint)?;
124 Ok(parse_local_replica_root_key(&body))
125}
126
127fn parse_local_replica_root_key(body: &[u8]) -> Option<String> {
128 serde_json::from_slice::<serde_json::Value>(body)
129 .ok()
130 .and_then(|value| root_key_from_json(&value))
131 .or_else(|| {
132 serde_cbor::from_slice::<serde_cbor::Value>(body)
133 .ok()
134 .and_then(|value| root_key_from_cbor(&value))
135 })
136}
137
138fn root_key_from_json(value: &serde_json::Value) -> Option<String> {
139 match value {
140 serde_json::Value::String(text) => nonempty_text(text),
141 serde_json::Value::Array(values) => values.iter().find_map(root_key_from_json),
142 serde_json::Value::Object(map) => map
143 .get("root_key")
144 .and_then(root_key_from_json)
145 .or_else(|| map.values().find_map(root_key_from_json)),
146 _ => None,
147 }
148}
149
150fn root_key_from_cbor(value: &serde_cbor::Value) -> Option<String> {
151 match value {
152 serde_cbor::Value::Bytes(bytes) => (!bytes.is_empty()).then(|| hex_bytes(bytes)),
153 serde_cbor::Value::Text(text) => nonempty_text(text),
154 serde_cbor::Value::Array(values) => values.iter().find_map(root_key_from_cbor),
155 serde_cbor::Value::Map(map) => map
156 .iter()
157 .find_map(|(key, value)| match key {
158 serde_cbor::Value::Text(key) if key == "root_key" => root_key_from_cbor(value),
159 _ => None,
160 })
161 .or_else(|| map.values().find_map(root_key_from_cbor)),
162 _ => None,
163 }
164}
165
166fn nonempty_text(text: &str) -> Option<String> {
167 let trimmed = text.trim();
168 (!trimmed.is_empty()).then(|| trimmed.to_string())
169}
170
171fn hex_bytes(bytes: &[u8]) -> String {
172 let mut encoded = String::with_capacity(bytes.len() * 2);
173 for byte in bytes {
174 use std::fmt::Write as _;
175 let _ = write!(encoded, "{byte:02x}");
176 }
177 encoded
178}
179
180#[must_use]
182pub fn parse_ready_json_value(data: &serde_json::Value) -> bool {
183 match data {
184 serde_json::Value::Bool(value) => *value,
185 serde_json::Value::String(value) => value.trim() == "(true)",
186 serde_json::Value::Array(values) => values.iter().any(parse_ready_json_value),
187 serde_json::Value::Object(map) => map.values().any(parse_ready_json_value),
188 _ => false,
189 }
190}
191
192pub fn query_subnet_registry_json(
194 network: Option<&str>,
195 root: &str,
196) -> Result<String, ReplicaQueryError> {
197 let bytes = local_query(network, root, "canic_subnet_registry")?;
198 let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
199 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
200 let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
201 serde_json::to_string(&response.to_cli_json()).map_err(ReplicaQueryError::from)
202}
203
204pub fn query_subnet_registry_json_from_root(
206 network: Option<&str>,
207 root: &str,
208 icp_root: &Path,
209) -> Result<String, ReplicaQueryError> {
210 let bytes = local_query_from_root(network, root, "canic_subnet_registry", icp_root)?;
211 let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
212 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
213 let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
214 serde_json::to_string(&response.to_cli_json()).map_err(ReplicaQueryError::from)
215}
216
217fn local_query(
219 network: Option<&str>,
220 canister: &str,
221 method: &str,
222) -> Result<Vec<u8>, ReplicaQueryError> {
223 local_query_with_endpoint(network, canister, method, local_replica_endpoint(network))
224}
225
226fn local_query_from_root(
227 network: Option<&str>,
228 canister: &str,
229 method: &str,
230 icp_root: &Path,
231) -> Result<Vec<u8>, ReplicaQueryError> {
232 local_query_with_endpoint(
233 network,
234 canister,
235 method,
236 local_replica_endpoint_from_root(network, icp_root),
237 )
238}
239
240fn local_query_with_endpoint(
241 _network: Option<&str>,
242 canister: &str,
243 method: &str,
244 endpoint: String,
245) -> Result<Vec<u8>, ReplicaQueryError> {
246 let canister_id =
247 Principal::from_text(canister).map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
248 let arg = Encode!().map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
249 let sender = Principal::anonymous();
250 let envelope = QueryEnvelope {
251 content: QueryContent {
252 request_type: "query",
253 canister_id: canister_id.as_slice(),
254 method_name: method,
255 arg: &arg,
256 sender: sender.as_slice(),
257 ingress_expiry: ingress_expiry_nanos()?,
258 },
259 };
260 let body = serde_cbor::to_vec(&envelope)?;
261 let response = post_cbor(
262 &endpoint,
263 &format!("/api/v2/canister/{canister}/query"),
264 &body,
265 )?;
266 let query_response = serde_cbor::from_slice::<QueryResponse>(&response)?;
267
268 if query_response.status == "replied" {
269 return query_response
270 .reply
271 .map(|reply| reply.arg)
272 .ok_or_else(|| ReplicaQueryError::Query("missing query reply".to_string()));
273 }
274
275 Err(ReplicaQueryError::Rejected {
276 code: query_response.reject_code.unwrap_or_default(),
277 message: query_response.reject_message.unwrap_or_default(),
278 })
279}
280
281fn local_replica_endpoint(network: Option<&str>) -> String {
283 local_replica_endpoint_with_port(network, configured_local_gateway_port().ok())
284}
285
286fn local_replica_endpoint_with_port(network: Option<&str>, configured_port: Option<u16>) -> String {
288 if let Some(network) = network.filter(|network| network.starts_with("http://")) {
289 return network.trim_end_matches('/').to_string();
290 }
291
292 let port = configured_port.unwrap_or(DEFAULT_LOCAL_GATEWAY_PORT);
293 format!("http://127.0.0.1:{port}")
294}
295
296fn ingress_expiry_nanos() -> Result<u64, ReplicaQueryError> {
298 let now = SystemTime::now()
299 .duration_since(UNIX_EPOCH)
300 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
301 let expiry = now
302 .as_nanos()
303 .saturating_add(5 * 60 * 1_000_000_000)
304 .min(u128::from(u64::MAX));
305 u64::try_from(expiry).map_err(|err| ReplicaQueryError::Query(err.to_string()))
306}
307
308fn post_cbor(endpoint: &str, path: &str, body: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
310 let (host, port) = parse_http_endpoint(endpoint)?;
311 let mut stream = TcpStream::connect((host.as_str(), port))?;
312 let request = format!(
313 "POST {path} HTTP/1.1\r\nHost: {host}:{port}\r\nContent-Type: application/cbor\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
314 body.len()
315 );
316 stream.write_all(request.as_bytes())?;
317 stream.write_all(body)?;
318
319 let mut response = Vec::new();
320 stream.read_to_end(&mut response)?;
321 split_http_body(&response)
322}
323
324fn get_http_status(endpoint: &str) -> Result<Vec<u8>, ReplicaQueryError> {
325 let (host, port) = parse_http_endpoint(endpoint)?;
326 let mut stream = TcpStream::connect((host.as_str(), port))?;
327 let request =
328 format!("GET /api/v2/status HTTP/1.1\r\nHost: {host}:{port}\r\nConnection: close\r\n\r\n");
329 stream.write_all(request.as_bytes())?;
330
331 let mut response = Vec::new();
332 stream.read_to_end(&mut response)?;
333 split_http_body(&response)
334}
335
336fn parse_http_endpoint(endpoint: &str) -> Result<(String, u16), ReplicaQueryError> {
338 let rest = endpoint
339 .strip_prefix("http://")
340 .ok_or_else(|| ReplicaQueryError::Query(format!("unsupported endpoint {endpoint}")))?;
341 let authority = rest.split('/').next().unwrap_or(rest);
342 let (host, port) = authority
343 .rsplit_once(':')
344 .ok_or_else(|| ReplicaQueryError::Query(format!("missing port in {endpoint}")))?;
345 let port = port
346 .parse::<u16>()
347 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
348 Ok((host.to_string(), port))
349}
350
351fn split_http_body(response: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
353 let marker = b"\r\n\r\n";
354 let Some(index) = response
355 .windows(marker.len())
356 .position(|window| window == marker)
357 else {
358 return Err(ReplicaQueryError::Query(
359 "malformed HTTP response".to_string(),
360 ));
361 };
362 let header = String::from_utf8_lossy(&response[..index]);
363 let status_ok = header
364 .lines()
365 .next()
366 .is_some_and(|status| status.contains(" 2"));
367 if !status_ok {
368 return Err(ReplicaQueryError::Query(header.to_string()));
369 }
370 Ok(response[index + marker.len()..].to_vec())
371}
372
373#[derive(Serialize)]
378struct QueryEnvelope<'a> {
379 content: QueryContent<'a>,
380}
381
382#[derive(Serialize)]
387struct QueryContent<'a> {
388 request_type: &'static str,
389 #[serde(with = "serde_bytes")]
390 canister_id: &'a [u8],
391 method_name: &'a str,
392 #[serde(with = "serde_bytes")]
393 arg: &'a [u8],
394 #[serde(with = "serde_bytes")]
395 sender: &'a [u8],
396 ingress_expiry: u64,
397}
398
399#[derive(Deserialize)]
404struct QueryResponse {
405 status: String,
406 reply: Option<QueryReply>,
407 reject_code: Option<u64>,
408 reject_message: Option<String>,
409}
410
411#[derive(Deserialize)]
416struct QueryReply {
417 #[serde(with = "serde_bytes")]
418 arg: Vec<u8>,
419}
420
421#[derive(CandidType, Deserialize)]
426struct SubnetRegistryResponseWire(Vec<SubnetRegistryEntryWire>);
427
428impl SubnetRegistryResponseWire {
429 fn to_cli_json(&self) -> serde_json::Value {
431 serde_json::json!({
432 "Ok": self.0.iter().map(SubnetRegistryEntryWire::to_cli_json).collect::<Vec<_>>()
433 })
434 }
435}
436
437#[derive(CandidType, Deserialize)]
442struct SubnetRegistryEntryWire {
443 pid: Principal,
444 role: String,
445 record: CanisterInfoWire,
446}
447
448impl SubnetRegistryEntryWire {
449 fn to_cli_json(&self) -> serde_json::Value {
451 serde_json::json!({
452 "pid": self.pid.to_text(),
453 "role": self.role,
454 "record": self.record.to_cli_json(),
455 })
456 }
457}
458
459#[derive(CandidType, Deserialize)]
464struct CanisterInfoWire {
465 pid: Principal,
466 role: String,
467 parent_pid: Option<Principal>,
468 module_hash: Option<Vec<u8>>,
469 created_at: u64,
470}
471
472impl CanisterInfoWire {
473 fn to_cli_json(&self) -> serde_json::Value {
475 serde_json::json!({
476 "pid": self.pid.to_text(),
477 "role": self.role,
478 "parent_pid": self.parent_pid.as_ref().map(Principal::to_text),
479 "module_hash": self.module_hash,
480 "created_at": self.created_at.to_string(),
481 })
482 }
483}
484
485#[derive(CandidType, Deserialize)]
490struct CanicErrorWire {
491 code: ErrorCodeWire,
492 message: String,
493}
494
495impl fmt::Display for CanicErrorWire {
496 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
498 write!(formatter, "{:?}: {}", self.code, self.message)
499 }
500}
501
502#[derive(CandidType, Debug, Deserialize)]
507enum ErrorCodeWire {
508 Conflict,
509 Forbidden,
510 Internal,
511 InvalidInput,
512 InvariantViolation,
513 NotFound,
514 PolicyInstanceRequiresSingletonWithDirectory,
515 PolicyReplicaRequiresSingletonWithScaling,
516 PolicyRoleAlreadyRegistered,
517 PolicyShardRequiresSingletonWithSharding,
518 PolicySingletonAlreadyRegisteredUnderParent,
519 ResourceExhausted,
520 Unauthorized,
521 Unavailable,
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527
528 #[test]
530 fn parse_ready_json_value_accepts_nested_true_shapes() {
531 assert!(parse_ready_json_value(&serde_json::json!(true)));
532 assert!(parse_ready_json_value(&serde_json::json!({ "Ok": true })));
533 assert!(parse_ready_json_value(&serde_json::json!([{ "Ok": true }])));
534 assert!(parse_ready_json_value(&serde_json::json!({
535 "response_candid": "(true)"
536 })));
537 }
538
539 #[test]
541 fn parse_ready_json_value_rejects_false_shapes() {
542 assert!(!parse_ready_json_value(&serde_json::json!(false)));
543 assert!(!parse_ready_json_value(&serde_json::json!({ "Ok": false })));
544 assert!(!parse_ready_json_value(&serde_json::json!("true")));
545 }
546
547 #[test]
549 fn local_replica_endpoint_defaults_to_icp_cli_port() {
550 assert_eq!(
551 local_replica_endpoint_with_port(None, None),
552 "http://127.0.0.1:8000"
553 );
554 assert_eq!(
555 local_replica_endpoint_with_port(None, Some(8001)),
556 "http://127.0.0.1:8001"
557 );
558 assert_eq!(
559 local_replica_endpoint_with_port(Some("http://127.0.0.1:9000/"), Some(8001)),
560 "http://127.0.0.1:9000"
561 );
562 }
563
564 #[test]
565 fn parses_local_replica_root_key_from_json_status() {
566 let root_key = parse_local_replica_root_key(br#"{"root_key":"308182"}"#);
567
568 assert_eq!(root_key.as_deref(), Some("308182"));
569 }
570
571 #[test]
572 fn parses_local_replica_root_key_from_cbor_status() {
573 #[derive(Serialize)]
574 struct Status {
575 #[serde(with = "serde_bytes")]
576 root_key: Vec<u8>,
577 }
578
579 let body = serde_cbor::to_vec(&Status {
580 root_key: vec![0x30, 0x81, 0x82],
581 })
582 .expect("encode cbor status");
583 let root_key = parse_local_replica_root_key(&body);
584
585 assert_eq!(root_key.as_deref(), Some("308182"));
586 }
587
588 #[test]
589 fn rejects_blank_local_replica_root_key_status_values() {
590 #[derive(Serialize)]
591 struct Status {
592 #[serde(with = "serde_bytes")]
593 root_key: Vec<u8>,
594 }
595
596 assert_eq!(parse_local_replica_root_key(br#"{"root_key":" "}"#), None);
597
598 let body = serde_cbor::to_vec(&Status { root_key: vec![] })
599 .expect("encode empty cbor status root key");
600
601 assert_eq!(parse_local_replica_root_key(&body), None);
602 }
603}