pub(crate) mod agent_config;
pub mod agent_error;
pub(crate) mod builder;
pub mod http_transport;
pub(crate) mod nonce;
pub(crate) mod replica_api;
pub(crate) mod response;
mod response_authentication;
pub mod status;
pub use agent_config::AgentConfig;
pub use agent_error::AgentError;
pub use builder::AgentBuilder;
pub use nonce::NonceFactory;
pub use response::{Replied, RequestStatusResponse};
#[cfg(test)]
mod agent_test;
use crate::agent::replica_api::{
CallRequestContent, Certificate, Delegation, Envelope, QueryContent, ReadStateContent,
ReadStateResponse,
};
use crate::export::Principal;
use crate::hash_tree::Label;
use crate::identity::Identity;
use crate::{to_request_id, RequestId};
use delay::Waiter;
use serde::Serialize;
use status::Status;
use crate::agent::response_authentication::{
extract_der, initialize_bls, lookup_canister_info, lookup_request_status, lookup_value,
};
use crate::bls::bls12381::bls;
use std::convert::TryFrom;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::time::Duration;
const IC_REQUEST_DOMAIN_SEPARATOR: &[u8; 11] = b"\x0Aic-request";
const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
pub trait ReplicaV2Transport {
fn call<'a>(
&'a self,
effective_canister_id: Principal,
envelope: Vec<u8>,
request_id: RequestId,
) -> Pin<Box<dyn Future<Output = Result<(), AgentError>> + Send + 'a>>;
fn read_state<'a>(
&'a self,
effective_canister_id: Principal,
envelope: Vec<u8>,
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, AgentError>> + Send + 'a>>;
fn query<'a>(
&'a self,
effective_canister_id: Principal,
envelope: Vec<u8>,
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, AgentError>> + Send + 'a>>;
fn status<'a>(
&'a self,
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, AgentError>> + Send + 'a>>;
}
#[derive(Clone)]
pub struct Agent {
nonce_factory: NonceFactory,
identity: Arc<dyn Identity + Send + Sync>,
ingress_expiry_duration: Duration,
root_key: Arc<RwLock<Option<Vec<u8>>>>,
transport: Arc<dyn ReplicaV2Transport + Send + Sync>,
}
impl Agent {
pub fn builder() -> builder::AgentBuilder {
Default::default()
}
pub fn new(config: AgentConfig) -> Result<Agent, AgentError> {
initialize_bls()?;
Ok(Agent {
nonce_factory: config.nonce_factory,
identity: config.identity,
ingress_expiry_duration: config
.ingress_expiry_duration
.unwrap_or_else(|| Duration::from_secs(300)),
root_key: Arc::new(RwLock::new(Some(IC_ROOT_KEY.to_vec()))),
transport: config
.transport
.ok_or_else(AgentError::MissingReplicaTransport)?,
})
}
pub fn set_transport<F: 'static + ReplicaV2Transport + Send + Sync>(&mut self, transport: F) {
self.transport = Arc::new(transport);
}
pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
let status = self.status().await?;
let root_key = status
.root_key
.clone()
.ok_or(AgentError::NoRootKeyInStatus(status))?;
if let Ok(mut write_guard) = self.root_key.write() {
*write_guard = Some(root_key);
}
Ok(())
}
fn read_root_key(&self) -> Result<Vec<u8>, AgentError> {
if let Ok(read_lock) = self.root_key.read() {
if let Some(root_key) = read_lock.clone() {
Ok(root_key)
} else {
Err(AgentError::CouldNotReadRootKey())
}
} else {
Err(AgentError::CouldNotReadRootKey())
}
}
fn get_expiry_date(&self) -> u64 {
let permitted_drift = Duration::from_secs(60);
(self.ingress_expiry_duration
+ std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time wrapped around.")
- permitted_drift)
.as_nanos() as u64
}
fn construct_message(&self, request_id: &RequestId) -> Vec<u8> {
let mut buf = vec![];
buf.extend_from_slice(IC_REQUEST_DOMAIN_SEPARATOR);
buf.extend_from_slice(request_id.as_slice());
buf
}
async fn query_endpoint<A>(
&self,
effective_canister_id: Principal,
request: QueryContent,
) -> Result<A, AgentError>
where
A: serde::de::DeserializeOwned,
{
let request_id = to_request_id(&request)?;
let msg = self.construct_message(&request_id);
let signature = self.identity.sign(&msg).map_err(AgentError::SigningError)?;
let envelope = Envelope {
content: request,
sender_pubkey: signature.public_key,
sender_sig: signature.signature,
};
let mut serialized_bytes = Vec::new();
let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
serializer.self_describe()?;
envelope.serialize(&mut serializer)?;
let bytes = self
.transport
.query(effective_canister_id, serialized_bytes)
.await?;
serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
}
async fn read_state_endpoint<A>(
&self,
effective_canister_id: Principal,
request: ReadStateContent,
) -> Result<A, AgentError>
where
A: serde::de::DeserializeOwned,
{
let request_id = to_request_id(&request)?;
let msg = self.construct_message(&request_id);
let signature = self.identity.sign(&msg).map_err(AgentError::SigningError)?;
let envelope = Envelope {
content: request,
sender_pubkey: signature.public_key,
sender_sig: signature.signature,
};
let mut serialized_bytes = Vec::new();
let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
serializer.self_describe()?;
envelope.serialize(&mut serializer)?;
let bytes = self
.transport
.read_state(effective_canister_id, serialized_bytes)
.await?;
serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
}
async fn call_endpoint(
&self,
effective_canister_id: Principal,
request: CallRequestContent,
) -> Result<RequestId, AgentError> {
let request_id = to_request_id(&request)?;
let msg = self.construct_message(&request_id);
let signature = self.identity.sign(&msg).map_err(AgentError::SigningError)?;
let envelope = Envelope {
content: request,
sender_pubkey: signature.public_key,
sender_sig: signature.signature,
};
let mut serialized_bytes = Vec::new();
let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
serializer.self_describe()?;
envelope.serialize(&mut serializer)?;
self.transport
.call(effective_canister_id, serialized_bytes, request_id)
.await?;
Ok(request_id)
}
async fn query_raw(
&self,
canister_id: &Principal,
effective_canister_id: Principal,
method_name: &str,
arg: &[u8],
ingress_expiry_datetime: Option<u64>,
) -> Result<Vec<u8>, AgentError> {
self.query_endpoint::<replica_api::QueryResponse>(
effective_canister_id,
QueryContent::QueryRequest {
sender: self.identity.sender().map_err(AgentError::SigningError)?,
canister_id: canister_id.clone(),
method_name: method_name.to_string(),
arg: arg.to_vec(),
ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
},
)
.await
.and_then(|response| match response {
replica_api::QueryResponse::Replied { reply } => Ok(reply.arg),
replica_api::QueryResponse::Rejected {
reject_code,
reject_message,
} => Err(AgentError::ReplicaError {
reject_code,
reject_message,
}),
})
}
async fn update_raw(
&self,
canister_id: &Principal,
effective_canister_id: Principal,
method_name: &str,
arg: &[u8],
ingress_expiry_datetime: Option<u64>,
) -> Result<RequestId, AgentError> {
self.call_endpoint(
effective_canister_id,
CallRequestContent::CallRequest {
canister_id: canister_id.clone(),
method_name: method_name.into(),
arg: arg.to_vec(),
nonce: self.nonce_factory.generate().map(|b| b.as_slice().into()),
sender: self.identity.sender().map_err(AgentError::SigningError)?,
ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
},
)
.await
}
async fn read_state_raw(
&self,
paths: Vec<Vec<Label>>,
effective_canister_id: Principal,
) -> Result<Certificate, AgentError> {
let read_state_response: ReadStateResponse = self
.read_state_endpoint(
effective_canister_id,
ReadStateContent::ReadStateRequest {
sender: self.identity.sender().map_err(AgentError::SigningError)?,
paths,
ingress_expiry: self.get_expiry_date(),
},
)
.await?;
let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
.map_err(AgentError::InvalidCborData)?;
self.verify(&cert)?;
Ok(cert)
}
fn verify(&self, cert: &Certificate) -> Result<(), AgentError> {
let sig = &cert.signature;
let root_hash = cert.tree.digest();
let mut msg = vec![];
msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
msg.extend_from_slice(&root_hash);
let der_key = self.check_delegation(&cert.delegation)?;
let key = extract_der(der_key)?;
let result = bls::core_verify(sig, &*msg, &*key);
if result != bls::BLS_OK {
Err(AgentError::CertificateVerificationFailed())
} else {
Ok(())
}
}
fn check_delegation(&self, delegation: &Option<Delegation>) -> Result<Vec<u8>, AgentError> {
match delegation {
None => self.read_root_key(),
Some(delegation) => {
let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
.map_err(AgentError::InvalidCborData)?;
self.verify(&cert)?;
let public_key_path = vec![
"subnet".into(),
delegation.subnet_id.clone().into(),
"public_key".into(),
];
lookup_value(&cert, public_key_path).map(|pk| pk.to_vec())
}
}
}
pub async fn read_state_canister_info(
&self,
canister_id: Principal,
path: &str,
) -> Result<Vec<u8>, AgentError> {
let paths: Vec<Vec<Label>> = vec![vec![
"canister".into(),
canister_id.clone().into(),
path.into(),
]];
let cert = self.read_state_raw(paths, canister_id.clone()).await?;
lookup_canister_info(cert, canister_id, path)
}
pub async fn request_status_raw(
&self,
request_id: &RequestId,
effective_canister_id: Principal,
) -> Result<RequestStatusResponse, AgentError> {
let paths: Vec<Vec<Label>> =
vec![vec!["request_status".into(), request_id.to_vec().into()]];
let cert = self.read_state_raw(paths, effective_canister_id).await?;
lookup_request_status(cert, request_id)
}
pub fn update<S: Into<String>>(
&self,
canister_id: &Principal,
method_name: S,
) -> UpdateBuilder {
UpdateBuilder::new(self, canister_id.clone(), method_name.into())
}
pub async fn status(&self) -> Result<Status, AgentError> {
let bytes = self.transport.status().await?;
let cbor: serde_cbor::Value =
serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
}
pub fn query<S: Into<String>>(&self, canister_id: &Principal, method_name: S) -> QueryBuilder {
QueryBuilder::new(self, canister_id.clone(), method_name.into())
}
}
pub struct QueryBuilder<'agent> {
agent: &'agent Agent,
effective_canister_id: Principal,
canister_id: Principal,
method_name: String,
arg: Vec<u8>,
ingress_expiry_datetime: Option<u64>,
}
impl<'agent> QueryBuilder<'agent> {
pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
Self {
agent,
effective_canister_id: canister_id.clone(),
canister_id,
method_name,
arg: vec![],
ingress_expiry_datetime: None,
}
}
pub fn with_effective_canister_id(&mut self, canister_id: Principal) -> &mut Self {
self.effective_canister_id = canister_id;
self
}
pub fn with_arg<A: AsRef<[u8]>>(&mut self, arg: A) -> &mut Self {
self.arg = arg.as_ref().to_vec();
self
}
pub fn expire_at(&mut self, time: std::time::SystemTime) -> &mut Self {
self.ingress_expiry_datetime = Some(
time.duration_since(std::time::UNIX_EPOCH)
.expect("Time wrapped around")
.as_nanos() as u64,
);
self
}
pub fn expire_after(&mut self, duration: std::time::Duration) -> &mut Self {
let permitted_drift = Duration::from_secs(60);
self.ingress_expiry_datetime = Some(
(duration
+ std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time wrapped around")
- permitted_drift)
.as_nanos() as u64,
);
self
}
pub async fn call(&self) -> Result<Vec<u8>, AgentError> {
self.agent
.query_raw(
&self.canister_id,
self.effective_canister_id.clone(),
self.method_name.as_str(),
self.arg.as_slice(),
self.ingress_expiry_datetime,
)
.await
}
}
pub struct UpdateBuilder<'agent> {
agent: &'agent Agent,
pub effective_canister_id: Principal,
pub canister_id: Principal,
pub method_name: String,
pub arg: Vec<u8>,
pub ingress_expiry_datetime: Option<u64>,
}
impl<'agent> UpdateBuilder<'agent> {
pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
Self {
agent,
effective_canister_id: canister_id.clone(),
canister_id,
method_name,
arg: vec![],
ingress_expiry_datetime: None,
}
}
pub fn with_effective_canister_id(&mut self, canister_id: Principal) -> &mut Self {
self.effective_canister_id = canister_id;
self
}
pub fn with_arg<A: AsRef<[u8]>>(&mut self, arg: A) -> &mut Self {
self.arg = arg.as_ref().to_vec();
self
}
pub fn expire_at(&mut self, time: std::time::SystemTime) -> &mut Self {
self.ingress_expiry_datetime = Some(
time.duration_since(std::time::UNIX_EPOCH)
.expect("Time wrapped around")
.as_nanos() as u64,
);
self
}
pub fn expire_after(&mut self, duration: std::time::Duration) -> &mut Self {
let permitted_drift = Duration::from_secs(60);
self.ingress_expiry_datetime = Some(
(duration
+ std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time wrapped around")
- permitted_drift)
.as_nanos() as u64,
);
self
}
pub async fn call_and_wait<W: Waiter>(&self, mut waiter: W) -> Result<Vec<u8>, AgentError> {
let request_id = self
.agent
.update_raw(
&self.canister_id,
self.effective_canister_id.clone(),
self.method_name.as_str(),
self.arg.as_slice(),
self.ingress_expiry_datetime,
)
.await?;
waiter.start();
let mut request_accepted = false;
loop {
match self
.agent
.request_status_raw(&request_id, self.effective_canister_id.clone())
.await?
{
RequestStatusResponse::Replied {
reply: Replied::CallReplied(arg),
} => return Ok(arg),
RequestStatusResponse::Rejected {
reject_code,
reject_message,
} => {
return Err(AgentError::ReplicaError {
reject_code,
reject_message,
})
}
RequestStatusResponse::Unknown => (),
RequestStatusResponse::Received | RequestStatusResponse::Processing => {
if !request_accepted {
waiter
.restart()
.map_err(|_| AgentError::WaiterRestartError())?;
request_accepted = true;
}
}
RequestStatusResponse::Done => {
return Err(AgentError::RequestStatusDoneNoReply(String::from(
request_id,
)))
}
};
waiter
.wait()
.map_err(|_| AgentError::TimeoutWaitingForResponse())?;
}
}
pub async fn call(&self) -> Result<RequestId, AgentError> {
self.agent
.update_raw(
&self.canister_id,
self.effective_canister_id.clone(),
self.method_name.as_str(),
self.arg.as_slice(),
self.ingress_expiry_datetime,
)
.await
}
}