1use crate::icp_config::{
2 DEFAULT_LOCAL_GATEWAY_PORT, configured_local_gateway_port,
3 configured_local_gateway_port_from_root,
4};
5use candid::{CandidType, Decode, Encode, Principal};
6use serde::{Deserialize, Serialize};
7use std::{
8 error::Error,
9 fmt,
10 io::{Read, Write},
11 net::TcpStream,
12 path::Path,
13 time::{SystemTime, UNIX_EPOCH},
14};
15
16#[derive(Debug)]
21pub enum ReplicaQueryError {
22 Io(std::io::Error),
23 Cbor(serde_cbor::Error),
24 Json(serde_json::Error),
25 Query(String),
26 Rejected { code: u64, message: String },
27}
28
29impl fmt::Display for ReplicaQueryError {
30 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
32 match self {
33 Self::Io(err) => write!(formatter, "{err}"),
34 Self::Cbor(err) => write!(formatter, "{err}"),
35 Self::Json(err) => write!(formatter, "{err}"),
36 Self::Query(message) => write!(formatter, "{message}"),
37 Self::Rejected { code, message } => {
38 write!(
39 formatter,
40 "local replica rejected query: code={code} message={message}"
41 )
42 }
43 }
44 }
45}
46
47impl Error for ReplicaQueryError {
48 fn source(&self) -> Option<&(dyn Error + 'static)> {
50 match self {
51 Self::Io(err) => Some(err),
52 Self::Cbor(err) => Some(err),
53 Self::Json(err) => Some(err),
54 Self::Query(_) | Self::Rejected { .. } => None,
55 }
56 }
57}
58
59impl From<std::io::Error> for ReplicaQueryError {
60 fn from(err: std::io::Error) -> Self {
62 Self::Io(err)
63 }
64}
65
66impl From<serde_cbor::Error> for ReplicaQueryError {
67 fn from(err: serde_cbor::Error) -> Self {
69 Self::Cbor(err)
70 }
71}
72
73impl From<serde_json::Error> for ReplicaQueryError {
74 fn from(err: serde_json::Error) -> Self {
76 Self::Json(err)
77 }
78}
79
80#[must_use]
82pub fn should_use_local_replica_query(network: Option<&str>) -> bool {
83 network.is_none_or(|network| network == "local" || network.starts_with("http://"))
84}
85
86pub fn query_ready(network: Option<&str>, canister: &str) -> Result<bool, ReplicaQueryError> {
88 let bytes = local_query(network, canister, "canic_ready")?;
89 Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
90}
91
92pub fn query_ready_from_root(
94 network: Option<&str>,
95 canister: &str,
96 icp_root: &Path,
97) -> Result<bool, ReplicaQueryError> {
98 let bytes = local_query_from_root(network, canister, "canic_ready", icp_root)?;
99 Decode!(&bytes, bool).map_err(|err| ReplicaQueryError::Query(err.to_string()))
100}
101
102#[must_use]
104pub fn local_replica_status_reachable_from_root(network: Option<&str>, icp_root: &Path) -> bool {
105 get_http_status(&local_replica_endpoint_from_root(network, icp_root)).is_ok()
106}
107
108#[must_use]
110pub fn local_replica_endpoint_from_root(network: Option<&str>, icp_root: &Path) -> String {
111 local_replica_endpoint_with_port(
112 network,
113 configured_local_gateway_port_from_root(icp_root).ok(),
114 )
115}
116
117pub fn local_replica_root_key_from_root(
119 network: Option<&str>,
120 icp_root: &Path,
121) -> Result<Option<String>, ReplicaQueryError> {
122 let endpoint = local_replica_endpoint_from_root(network, icp_root);
123 let body = get_http_status(&endpoint)?;
124 Ok(parse_local_replica_root_key(&body))
125}
126
127fn parse_local_replica_root_key(body: &[u8]) -> Option<String> {
128 serde_json::from_slice::<serde_json::Value>(body)
129 .ok()
130 .and_then(|value| root_key_from_json(&value))
131 .or_else(|| {
132 serde_cbor::from_slice::<serde_cbor::Value>(body)
133 .ok()
134 .and_then(|value| root_key_from_cbor(&value))
135 })
136}
137
138fn root_key_from_json(value: &serde_json::Value) -> Option<String> {
139 match value {
140 serde_json::Value::String(text) => nonempty_text(text),
141 serde_json::Value::Array(values) => values.iter().find_map(root_key_from_json),
142 serde_json::Value::Object(map) => map
143 .get("root_key")
144 .and_then(root_key_from_json)
145 .or_else(|| map.values().find_map(root_key_from_json)),
146 _ => None,
147 }
148}
149
150fn root_key_from_cbor(value: &serde_cbor::Value) -> Option<String> {
151 match value {
152 serde_cbor::Value::Bytes(bytes) => (!bytes.is_empty()).then(|| hex_bytes(bytes)),
153 serde_cbor::Value::Text(text) => nonempty_text(text),
154 serde_cbor::Value::Array(values) => values.iter().find_map(root_key_from_cbor),
155 serde_cbor::Value::Map(map) => map
156 .iter()
157 .find_map(|(key, value)| match key {
158 serde_cbor::Value::Text(key) if key == "root_key" => root_key_from_cbor(value),
159 _ => None,
160 })
161 .or_else(|| map.values().find_map(root_key_from_cbor)),
162 _ => None,
163 }
164}
165
166fn nonempty_text(text: &str) -> Option<String> {
167 let trimmed = text.trim();
168 (!trimmed.is_empty()).then(|| trimmed.to_string())
169}
170
171fn hex_bytes(bytes: &[u8]) -> String {
172 let mut encoded = String::with_capacity(bytes.len() * 2);
173 for byte in bytes {
174 use std::fmt::Write as _;
175 let _ = write!(encoded, "{byte:02x}");
176 }
177 encoded
178}
179
180#[must_use]
182pub fn parse_ready_json_value(data: &serde_json::Value) -> bool {
183 match data {
184 serde_json::Value::Bool(value) => *value,
185 serde_json::Value::String(value) => value.trim() == "(true)",
186 serde_json::Value::Array(values) => values.iter().any(parse_ready_json_value),
187 serde_json::Value::Object(map) => map.values().any(parse_ready_json_value),
188 _ => false,
189 }
190}
191
192pub fn query_subnet_registry_json(
194 network: Option<&str>,
195 root: &str,
196) -> Result<String, ReplicaQueryError> {
197 let bytes = local_query(network, root, "canic_subnet_registry")?;
198 let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
199 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
200 let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
201 serde_json::to_string(&response.to_cli_json()).map_err(ReplicaQueryError::from)
202}
203
204pub fn query_subnet_registry_json_from_root(
206 network: Option<&str>,
207 root: &str,
208 icp_root: &Path,
209) -> Result<String, ReplicaQueryError> {
210 let bytes = local_query_from_root(network, root, "canic_subnet_registry", icp_root)?;
211 let result = Decode!(&bytes, Result<SubnetRegistryResponseWire, CanicErrorWire>)
212 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
213 let response = result.map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
214 serde_json::to_string(&response.to_cli_json()).map_err(ReplicaQueryError::from)
215}
216
217fn local_query(
219 network: Option<&str>,
220 canister: &str,
221 method: &str,
222) -> Result<Vec<u8>, ReplicaQueryError> {
223 local_query_with_endpoint(canister, method, local_replica_endpoint(network))
224}
225
226fn local_query_from_root(
227 network: Option<&str>,
228 canister: &str,
229 method: &str,
230 icp_root: &Path,
231) -> Result<Vec<u8>, ReplicaQueryError> {
232 local_query_with_endpoint(
233 canister,
234 method,
235 local_replica_endpoint_from_root(network, icp_root),
236 )
237}
238
239fn local_query_with_endpoint(
240 canister: &str,
241 method: &str,
242 endpoint: String,
243) -> Result<Vec<u8>, ReplicaQueryError> {
244 let canister_id =
245 Principal::from_text(canister).map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
246 let arg = Encode!().map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
247 let sender = Principal::anonymous();
248 let envelope = QueryEnvelope {
249 content: QueryContent {
250 request_type: "query",
251 canister_id: canister_id.as_slice(),
252 method_name: method,
253 arg: &arg,
254 sender: sender.as_slice(),
255 ingress_expiry: ingress_expiry_nanos()?,
256 },
257 };
258 let body = serde_cbor::to_vec(&envelope)?;
259 let response = post_cbor(
260 &endpoint,
261 &format!("/api/v2/canister/{canister}/query"),
262 &body,
263 )?;
264 let query_response = serde_cbor::from_slice::<QueryResponse>(&response)?;
265
266 if query_response.status == "replied" {
267 return query_response
268 .reply
269 .map(|reply| reply.arg)
270 .ok_or_else(|| ReplicaQueryError::Query("missing query reply".to_string()));
271 }
272
273 Err(ReplicaQueryError::Rejected {
274 code: query_response.reject_code.unwrap_or_default(),
275 message: query_response.reject_message.unwrap_or_default(),
276 })
277}
278
279fn local_replica_endpoint(network: Option<&str>) -> String {
281 local_replica_endpoint_with_port(network, configured_local_gateway_port().ok())
282}
283
284fn local_replica_endpoint_with_port(network: Option<&str>, configured_port: Option<u16>) -> String {
286 if let Some(network) = network.filter(|network| network.starts_with("http://")) {
287 return network.trim_end_matches('/').to_string();
288 }
289
290 let port = configured_port.unwrap_or(DEFAULT_LOCAL_GATEWAY_PORT);
291 format!("http://127.0.0.1:{port}")
292}
293
294fn ingress_expiry_nanos() -> Result<u64, ReplicaQueryError> {
296 let now = SystemTime::now()
297 .duration_since(UNIX_EPOCH)
298 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
299 let expiry = now
300 .as_nanos()
301 .saturating_add(5 * 60 * 1_000_000_000)
302 .min(u128::from(u64::MAX));
303 u64::try_from(expiry).map_err(|err| ReplicaQueryError::Query(err.to_string()))
304}
305
306fn post_cbor(endpoint: &str, path: &str, body: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
308 let (host, port) = parse_http_endpoint(endpoint)?;
309 let mut stream = TcpStream::connect((host.as_str(), port))?;
310 let request = format!(
311 "POST {path} HTTP/1.1\r\nHost: {host}:{port}\r\nContent-Type: application/cbor\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
312 body.len()
313 );
314 stream.write_all(request.as_bytes())?;
315 stream.write_all(body)?;
316
317 let mut response = Vec::new();
318 stream.read_to_end(&mut response)?;
319 split_http_body(&response)
320}
321
322fn get_http_status(endpoint: &str) -> Result<Vec<u8>, ReplicaQueryError> {
323 let (host, port) = parse_http_endpoint(endpoint)?;
324 let mut stream = TcpStream::connect((host.as_str(), port))?;
325 let request =
326 format!("GET /api/v2/status HTTP/1.1\r\nHost: {host}:{port}\r\nConnection: close\r\n\r\n");
327 stream.write_all(request.as_bytes())?;
328
329 let mut response = Vec::new();
330 stream.read_to_end(&mut response)?;
331 split_http_body(&response)
332}
333
334fn parse_http_endpoint(endpoint: &str) -> Result<(String, u16), ReplicaQueryError> {
336 let rest = endpoint
337 .strip_prefix("http://")
338 .ok_or_else(|| ReplicaQueryError::Query(format!("unsupported endpoint {endpoint}")))?;
339 let authority = rest.split('/').next().unwrap_or(rest);
340 let (host, port) = authority
341 .rsplit_once(':')
342 .ok_or_else(|| ReplicaQueryError::Query(format!("missing port in {endpoint}")))?;
343 let port = port
344 .parse::<u16>()
345 .map_err(|err| ReplicaQueryError::Query(err.to_string()))?;
346 Ok((host.to_string(), port))
347}
348
349fn split_http_body(response: &[u8]) -> Result<Vec<u8>, ReplicaQueryError> {
351 let marker = b"\r\n\r\n";
352 let Some(index) = response
353 .windows(marker.len())
354 .position(|window| window == marker)
355 else {
356 return Err(ReplicaQueryError::Query(
357 "malformed HTTP response".to_string(),
358 ));
359 };
360 let header = String::from_utf8_lossy(&response[..index]);
361 let status_ok = header
362 .lines()
363 .next()
364 .is_some_and(|status| status.contains(" 2"));
365 if !status_ok {
366 return Err(ReplicaQueryError::Query(header.to_string()));
367 }
368 Ok(response[index + marker.len()..].to_vec())
369}
370
371#[derive(Serialize)]
376struct QueryEnvelope<'a> {
377 content: QueryContent<'a>,
378}
379
380#[derive(Serialize)]
385struct QueryContent<'a> {
386 request_type: &'static str,
387 #[serde(with = "serde_bytes")]
388 canister_id: &'a [u8],
389 method_name: &'a str,
390 #[serde(with = "serde_bytes")]
391 arg: &'a [u8],
392 #[serde(with = "serde_bytes")]
393 sender: &'a [u8],
394 ingress_expiry: u64,
395}
396
397#[derive(Deserialize)]
402struct QueryResponse {
403 status: String,
404 reply: Option<QueryReply>,
405 reject_code: Option<u64>,
406 reject_message: Option<String>,
407}
408
409#[derive(Deserialize)]
414struct QueryReply {
415 #[serde(with = "serde_bytes")]
416 arg: Vec<u8>,
417}
418
419#[derive(CandidType, Deserialize)]
424struct SubnetRegistryResponseWire(Vec<SubnetRegistryEntryWire>);
425
426impl SubnetRegistryResponseWire {
427 fn to_cli_json(&self) -> serde_json::Value {
429 serde_json::json!({
430 "Ok": self.0.iter().map(SubnetRegistryEntryWire::to_cli_json).collect::<Vec<_>>()
431 })
432 }
433}
434
435#[derive(CandidType, Deserialize)]
440struct SubnetRegistryEntryWire {
441 pid: Principal,
442 role: String,
443 record: CanisterInfoWire,
444}
445
446impl SubnetRegistryEntryWire {
447 fn to_cli_json(&self) -> serde_json::Value {
449 serde_json::json!({
450 "pid": self.pid.to_text(),
451 "role": self.role,
452 "record": self.record.to_cli_json(),
453 })
454 }
455}
456
457#[derive(CandidType, Deserialize)]
462struct CanisterInfoWire {
463 pid: Principal,
464 role: String,
465 parent_pid: Option<Principal>,
466 module_hash: Option<Vec<u8>>,
467 created_at: u64,
468}
469
470impl CanisterInfoWire {
471 fn to_cli_json(&self) -> serde_json::Value {
473 serde_json::json!({
474 "pid": self.pid.to_text(),
475 "role": self.role,
476 "parent_pid": self.parent_pid.as_ref().map(Principal::to_text),
477 "module_hash": self.module_hash,
478 "created_at": self.created_at.to_string(),
479 })
480 }
481}
482
483#[derive(CandidType, Deserialize)]
488struct CanicErrorWire {
489 code: ErrorCodeWire,
490 message: String,
491}
492
493impl fmt::Display for CanicErrorWire {
494 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
496 write!(formatter, "{:?}: {}", self.code, self.message)
497 }
498}
499
500#[derive(CandidType, Debug, Deserialize)]
505enum ErrorCodeWire {
506 Conflict,
507 Forbidden,
508 Internal,
509 InvalidInput,
510 InvariantViolation,
511 NotFound,
512 PolicyInstanceRequiresSingletonWithDirectory,
513 PolicyReplicaRequiresSingletonWithScaling,
514 PolicyRoleAlreadyRegistered,
515 PolicyShardRequiresSingletonWithSharding,
516 PolicySingletonAlreadyRegisteredUnderParent,
517 ResourceExhausted,
518 Unauthorized,
519 Unavailable,
520}
521
522#[cfg(test)]
523mod tests {
524 use super::*;
525
526 #[test]
528 fn parse_ready_json_value_accepts_nested_true_shapes() {
529 assert!(parse_ready_json_value(&serde_json::json!(true)));
530 assert!(parse_ready_json_value(&serde_json::json!({ "Ok": true })));
531 assert!(parse_ready_json_value(&serde_json::json!([{ "Ok": true }])));
532 assert!(parse_ready_json_value(&serde_json::json!({
533 "response_candid": "(true)"
534 })));
535 }
536
537 #[test]
539 fn parse_ready_json_value_rejects_false_shapes() {
540 assert!(!parse_ready_json_value(&serde_json::json!(false)));
541 assert!(!parse_ready_json_value(&serde_json::json!({ "Ok": false })));
542 assert!(!parse_ready_json_value(&serde_json::json!("true")));
543 }
544
545 #[test]
547 fn local_replica_endpoint_defaults_to_icp_cli_port() {
548 assert_eq!(
549 local_replica_endpoint_with_port(None, None),
550 "http://127.0.0.1:8000"
551 );
552 assert_eq!(
553 local_replica_endpoint_with_port(None, Some(8001)),
554 "http://127.0.0.1:8001"
555 );
556 assert_eq!(
557 local_replica_endpoint_with_port(Some("http://127.0.0.1:9000/"), Some(8001)),
558 "http://127.0.0.1:9000"
559 );
560 }
561
562 #[test]
563 fn parses_local_replica_root_key_from_json_status() {
564 let root_key = parse_local_replica_root_key(br#"{"root_key":"308182"}"#);
565
566 assert_eq!(root_key.as_deref(), Some("308182"));
567 }
568
569 #[test]
570 fn parses_local_replica_root_key_from_cbor_status() {
571 #[derive(Serialize)]
572 struct Status {
573 #[serde(with = "serde_bytes")]
574 root_key: Vec<u8>,
575 }
576
577 let body = serde_cbor::to_vec(&Status {
578 root_key: vec![0x30, 0x81, 0x82],
579 })
580 .expect("encode cbor status");
581 let root_key = parse_local_replica_root_key(&body);
582
583 assert_eq!(root_key.as_deref(), Some("308182"));
584 }
585
586 #[test]
587 fn rejects_blank_local_replica_root_key_status_values() {
588 #[derive(Serialize)]
589 struct Status {
590 #[serde(with = "serde_bytes")]
591 root_key: Vec<u8>,
592 }
593
594 assert_eq!(parse_local_replica_root_key(br#"{"root_key":" "}"#), None);
595
596 let body = serde_cbor::to_vec(&Status { root_key: vec![] })
597 .expect("encode empty cbor status root key");
598
599 assert_eq!(parse_local_replica_root_key(&body), None);
600 }
601}