use super::{io, wire};
use crate::net::request_id;
use commonware_codec::{EncodeShared, IsUnit, Read};
use commonware_cryptography::Digest;
use commonware_runtime::{Network, Spawner};
use commonware_storage::{mmr::Location, qmdb::sync};
use commonware_utils::channel::{mpsc, oneshot};
use std::num::NonZeroU64;
#[derive(Clone)]
pub struct Resolver<Op, D>
where
Op: Read + EncodeShared + 'static,
Op::Cfg: IsUnit,
D: Digest,
{
request_id_generator: request_id::Generator,
request_tx: mpsc::Sender<io::Request<wire::Message<Op, D>>>,
}
impl<Op, D> Resolver<Op, D>
where
Op: Read + EncodeShared,
Op::Cfg: IsUnit,
D: Digest,
{
pub async fn connect<E>(
context: E,
server_addr: std::net::SocketAddr,
) -> Result<Self, commonware_runtime::Error>
where
E: Network + Spawner,
{
let (sink, stream) = context.dial(server_addr).await?;
let (request_tx, _handle) = io::run(context, sink, stream)?;
Ok(Self {
request_id_generator: request_id::Generator::new(),
request_tx,
})
}
pub async fn get_sync_target(&self) -> Result<sync::Target<D>, crate::Error> {
let request_id = self.request_id_generator.next();
let request =
wire::Message::GetSyncTargetRequest(wire::GetSyncTargetRequest { request_id });
let (tx, rx) = oneshot::channel();
self.request_tx
.clone()
.send(io::Request {
request,
response_tx: tx,
})
.await
.map_err(|_| crate::Error::RequestChannelClosed)?;
let response = rx
.await
.map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
match response {
wire::Message::GetSyncTargetResponse(r) => Ok(r.target),
wire::Message::Error(err) => Err(crate::Error::Server {
code: err.error_code,
message: err.message,
}),
_ => Err(crate::Error::UnexpectedResponse { request_id }),
}
}
}
impl<Op, D> sync::resolver::Resolver for Resolver<Op, D>
where
Op: Clone + Read + EncodeShared,
Op::Cfg: IsUnit,
D: Digest,
{
type Digest = D;
type Op = Op;
type Error = crate::Error;
async fn get_operations(
&self,
op_count: Location,
start_loc: Location,
max_ops: NonZeroU64,
include_pinned_nodes: bool,
_cancel_rx: oneshot::Receiver<()>,
) -> Result<sync::resolver::FetchResult<Self::Op, Self::Digest>, Self::Error> {
let request_id = self.request_id_generator.next();
let request = wire::Message::GetOperationsRequest(wire::GetOperationsRequest {
request_id,
op_count,
start_loc,
max_ops,
include_pinned_nodes,
});
let (tx, rx) = oneshot::channel();
self.request_tx
.clone()
.send(io::Request {
request,
response_tx: tx,
})
.await
.map_err(|_| crate::Error::RequestChannelClosed)?;
let response = rx
.await
.map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
let (proof, operations, pinned_nodes) = match response {
wire::Message::GetOperationsResponse(r) => (r.proof, r.operations, r.pinned_nodes),
wire::Message::Error(err) => {
return Err(crate::Error::Server {
code: err.error_code,
message: err.message,
})
}
_ => return Err(crate::Error::UnexpectedResponse { request_id }),
};
let (tx, _rx) = oneshot::channel();
Ok(sync::resolver::FetchResult {
proof,
operations,
success_tx: tx,
pinned_nodes,
})
}
}