use crate::{
error::Error,
events::{Event, TimeoutHandled},
handlers::{validate_state_machine, MessageResult},
host::{IsmpHost, StateMachine},
messaging::{hash_post_response, hash_request, TimeoutMessage},
router::Response,
};
use alloc::vec::Vec;
pub fn handle<H>(host: &H, msg: TimeoutMessage) -> Result<MessageResult, anyhow::Error>
where
H: IsmpHost,
{
let consensus_clients = host.consensus_clients();
let check_state_machine_client = |state_machine: StateMachine| {
consensus_clients
.iter()
.find_map(|client| client.state_machine(state_machine).ok())
.is_none()
};
let results = match msg {
TimeoutMessage::Post { requests, timeout_proof } => {
let state_machine = validate_state_machine(host, timeout_proof.height)?;
let state = host.state_machine_commitment(timeout_proof.height)?;
for request in &requests {
if request.dest_chain() != timeout_proof.height.id.state_id &&
!(host.is_allowed_proxy(&timeout_proof.height.id.state_id) &&
check_state_machine_client(request.dest_chain()))
{
Err(Error::RequestProxyProhibited { meta: request.into() })?
}
let commitment = hash_request::<H>(request);
if host.request_commitment(commitment).is_err() {
Err(Error::UnknownRequest { meta: request.into() })?
}
if !request.timed_out(state.timestamp()) {
Err(Error::RequestTimeoutNotElapsed {
meta: request.into(),
timeout_timestamp: request.timeout(),
state_machine_time: state.timestamp(),
})?
}
}
let keys = state_machine.receipts_state_trie_key(requests.clone().into());
let values = state_machine.verify_state_proof(host, keys, state, &timeout_proof)?;
if values.into_iter().any(|(_key, val)| val.is_some()) {
Err(Error::Custom("Some Requests in the batch have been delivered".into()))?
}
let router = host.ismp_router();
requests
.into_iter()
.map(|request| {
let cb = router.module_for_id(request.source_module())?;
let meta = host.delete_request_commitment(&request)?;
let mut signer = None;
if host.host_state_machine() != request.source_chain() {
signer = host.delete_request_receipt(&request).ok();
}
let res = cb.on_timeout(request.clone().into()).map(|_| {
let commitment = hash_request::<H>(&request);
Event::PostRequestTimeoutHandled(TimeoutHandled {
commitment,
source: request.source_chain(),
dest: request.dest_chain(),
})
});
if res.is_err() {
host.store_request_commitment(&request, meta)?;
if host.host_state_machine() != request.source_chain() && signer.is_some() {
host.store_request_receipt(&request, &signer.expect("Infaliible"))?;
}
}
Ok::<_, anyhow::Error>(res)
})
.collect::<Result<Vec<_>, _>>()?
},
TimeoutMessage::PostResponse { responses, timeout_proof } => {
let state_machine = validate_state_machine(host, timeout_proof.height)?;
let state = host.state_machine_commitment(timeout_proof.height)?;
for response in &responses {
if response.dest_chain() != timeout_proof.height.id.state_id &&
!(host.is_allowed_proxy(&timeout_proof.height.id.state_id) &&
check_state_machine_client(response.dest_chain()))
{
Err(Error::ResponseProxyProhibited {
meta: Response::Post(response.clone()).into(),
})?
}
let commitment = hash_post_response::<H>(response);
if host.response_commitment(commitment).is_err() {
Err(Error::UnknownResponse { meta: Response::Post(response.clone()).into() })?
}
if response.timeout() > state.timestamp() {
Err(Error::RequestTimeoutNotElapsed {
meta: response.into(),
timeout_timestamp: response.timeout(),
state_machine_time: state.timestamp(),
})?
}
}
let items = responses.iter().map(|r| Into::into(r.clone())).collect::<Vec<Response>>();
let keys = state_machine.receipts_state_trie_key(items.into());
let values = state_machine.verify_state_proof(host, keys, state, &timeout_proof)?;
if values.into_iter().any(|(_key, val)| val.is_some()) {
Err(Error::Custom("Some responses in the batch have been delivered".into()))?
}
let router = host.ismp_router();
responses
.into_iter()
.map(|response| {
let cb = router.module_for_id(response.source_module())?;
let meta = host.delete_response_commitment(&response)?;
let mut signer = None;
if host.host_state_machine() != response.source_chain() {
signer =
host.delete_response_receipt(&Response::Post(response.clone())).ok();
}
let res = cb.on_timeout(response.clone().into()).map(|_| {
let commitment = hash_post_response::<H>(&response);
Event::PostResponseTimeoutHandled(TimeoutHandled {
commitment,
source: response.source_chain(),
dest: response.dest_chain(),
})
});
if res.is_err() {
host.store_response_commitment(&response, meta)?;
if host.host_state_machine() != response.source_chain() && signer.is_some()
{
host.store_response_receipt(
&Response::Post(response),
&signer.expect("Infallible"),
)?;
}
}
Ok::<_, anyhow::Error>(res)
})
.collect::<Result<Vec<_>, _>>()?
},
TimeoutMessage::Get { requests } => {
for request in &requests {
let commitment = hash_request::<H>(request);
if host.request_commitment(commitment).is_err() {
Err(Error::UnknownRequest { meta: request.into() })?
}
if !request.timed_out(host.timestamp()) {
Err(Error::RequestTimeoutNotElapsed {
meta: request.into(),
timeout_timestamp: request.timeout(),
state_machine_time: host.timestamp(),
})?
}
}
let router = host.ismp_router();
requests
.into_iter()
.map(|request| {
let cb = router.module_for_id(request.source_module())?;
let meta = host.delete_request_commitment(&request)?;
let res = cb.on_timeout(request.clone().into()).map(|_| {
let commitment = hash_request::<H>(&request);
Event::GetRequestTimeoutHandled(TimeoutHandled {
commitment,
source: request.source_chain(),
dest: request.dest_chain(),
})
});
if res.is_err() {
host.store_request_commitment(&request, meta)?;
}
Ok::<_, anyhow::Error>(res)
})
.collect::<Result<Vec<_>, _>>()?
},
};
Ok(MessageResult::Timeout(results))
}