use crate::*;
use bitcoin::block::Header;
use notification::Notification;
use pending_request::{ErroredRequest, PendingRequest, SatisfiedRequest};
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub enum Event {
Response(SatisfiedRequest),
ResponseError(ErroredRequest),
Notification(Notification),
}
impl Event {
pub fn try_to_headers(&self) -> Option<Vec<(u32, Header)>> {
match self {
Event::Response(SatisfiedRequest::Header { req, resp }) => {
Some(vec![(req.height, resp.header)])
}
Event::Response(SatisfiedRequest::Headers { req, resp }) => {
Some((req.start_height..).zip(resp.headers.clone()).collect())
}
Event::Response(SatisfiedRequest::HeadersWithCheckpoint { req, resp }) => {
Some((req.start_height..).zip(resp.headers.clone()).collect())
}
Event::Notification(Notification::Header(n)) => Some(vec![(n.height(), *n.header())]),
_ => None,
}
}
}
#[derive(Debug)]
pub struct State<PReq: PendingRequest> {
pending: HashMap<u32, PReq>,
}
impl<PReq: PendingRequest> Default for State<PReq> {
fn default() -> Self {
Self::new()
}
}
impl<PReq: PendingRequest> State<PReq> {
pub fn new() -> Self {
Self {
pending: HashMap::new(),
}
}
pub fn clear(&mut self) {
self.pending.clear();
}
pub fn pending_requests(&self) -> impl Iterator<Item = RawRequest> + '_ {
self.pending.iter().map(|(&id, pending_req)| {
let (method, params) = pending_req.to_method_and_params();
RawRequest::new(id, method, params)
})
}
pub fn track_request<R>(&mut self, next_id: &mut u32, req: R) -> MaybeBatch<RawRequest>
where
R: Into<MaybeBatch<PReq>>,
{
fn _add_request<PReq: PendingRequest>(
state: &mut State<PReq>,
next_id: &mut u32,
req: PReq,
) -> RawRequest {
let id = *next_id;
*next_id = id.wrapping_add(1);
let (method, params) = req.to_method_and_params();
state.pending.insert(id, req);
RawRequest::new(id, method, params)
}
match req.into() {
MaybeBatch::Single(req) => _add_request(self, next_id, req).into(),
MaybeBatch::Batch(v) => v
.into_iter()
.map(|req| _add_request(self, next_id, req))
.collect::<Vec<_>>()
.into(),
}
}
pub fn process_incoming(
&mut self,
notification_or_response: RawNotificationOrResponse,
) -> Result<Option<Event>, ProcessError> {
match notification_or_response {
RawNotificationOrResponse::Notification(raw) => {
let notification = Notification::new(&raw).map_err(|error| {
ProcessError::CannotDeserializeNotification {
method: raw.method,
params: raw.params,
error,
}
})?;
Ok(Some(Event::Notification(notification)))
}
RawNotificationOrResponse::Response(resp) => {
let pending_req = self
.pending
.remove(&resp.id)
.ok_or(ProcessError::MissingRequest(resp.id))?;
Ok(match resp.result {
Ok(raw_resp) => pending_req
.satisfy(raw_resp)
.map_err(|de_err| ProcessError::CannotDeserializeResponse(resp.id, de_err))?
.map(Event::Response),
Err(raw_err) => pending_req.satisfy_error(raw_err).map(Event::ResponseError),
})
}
}
}
}
#[derive(Debug)]
pub enum ProcessError {
MissingRequest(u32),
CannotDeserializeResponse(u32, serde_json::Error),
CannotDeserializeNotification {
method: CowStr,
params: serde_json::Value,
error: serde_json::Error,
},
}
impl std::fmt::Display for ProcessError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ProcessError::MissingRequest(id) => {
write!(f, "no pending request found for response with id {}", id)
}
ProcessError::CannotDeserializeResponse(id, err) => {
write!(
f,
"failed to deserialize response for request id {}: {}",
id, err
)
}
ProcessError::CannotDeserializeNotification { method, error, .. } => {
write!(
f,
"failed to deserialize notification for method '{}': {}",
method, error
)
}
}
}
}
impl std::error::Error for ProcessError {}