commonware-sync 2026.5.0

Synchronize state between a server and client.
Documentation
use super::{io, wire};
use crate::net::request_id;
use commonware_codec::{EncodeShared, IsUnit, Read};
use commonware_cryptography::Digest;
use commonware_runtime::{Network, Spawner};
use commonware_storage::{
    mmr::{self, Location},
    qmdb::sync::{self, compact, resolver::fetch_operation_range},
};
use commonware_utils::channel::{mpsc, oneshot};
use std::num::NonZeroU64;

/// Network resolver that works directly with generic wire messages.
#[derive(Clone)]
pub struct Resolver<Op, D>
where
    Op: Read + EncodeShared + 'static,
    Op::Cfg: IsUnit,
    D: Digest,
{
    request_id_generator: request_id::Generator,
    request_tx: mpsc::Sender<io::Request<wire::Message<Op, D>>>,
}

impl<Op, D> Resolver<Op, D>
where
    Op: Read + EncodeShared,
    Op::Cfg: IsUnit,
    D: Digest,
{
    /// Returns a resolver connected to the server at the given address.
    pub async fn connect<E>(
        context: E,
        server_addr: std::net::SocketAddr,
    ) -> Result<Self, commonware_runtime::Error>
    where
        E: Network + Spawner,
    {
        let (sink, stream) = context.dial(server_addr).await?;
        let (request_tx, _handle) = io::run(context, sink, stream)?;
        Ok(Self {
            request_id_generator: request_id::Generator::new(),
            request_tx,
        })
    }

    /// Returns the current sync target from the server.
    pub async fn get_sync_target(&self) -> Result<sync::Target<mmr::Family, D>, crate::Error> {
        let request_id = self.request_id_generator.next();
        let request =
            wire::Message::GetSyncTargetRequest(wire::GetSyncTargetRequest { request_id });
        let (tx, rx) = oneshot::channel();
        self.request_tx
            .clone()
            .send(io::Request {
                request,
                response_tx: tx,
            })
            .await
            .map_err(|_| crate::Error::RequestChannelClosed)?;
        let response = rx
            .await
            .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
        match response {
            wire::Message::GetSyncTargetResponse(r) => Ok(r.target),
            wire::Message::Error(err) => Err(crate::Error::Server {
                code: err.error_code,
                message: err.message,
            }),
            _ => Err(crate::Error::UnexpectedResponse { request_id }),
        }
    }

    /// Returns the compact sync target currently served by the remote.
    pub async fn get_compact_target(
        &self,
    ) -> Result<compact::Target<mmr::Family, D>, crate::Error> {
        let request_id = self.request_id_generator.next();
        let request =
            wire::Message::GetCompactTargetRequest(wire::GetCompactTargetRequest { request_id });
        let (tx, rx) = oneshot::channel();
        self.request_tx
            .clone()
            .send(io::Request {
                request,
                response_tx: tx,
            })
            .await
            .map_err(|_| crate::Error::RequestChannelClosed)?;
        let response = rx
            .await
            .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
        match response {
            wire::Message::GetCompactTargetResponse(r) => Ok(r.target),
            wire::Message::Error(err) => Err(crate::Error::Server {
                code: err.error_code,
                message: err.message,
            }),
            _ => Err(crate::Error::UnexpectedResponse { request_id }),
        }
    }

    /// Returns compact authenticated state for the given target.
    pub async fn get_compact_state(
        &self,
        target: compact::Target<mmr::Family, D>,
    ) -> Result<compact::State<mmr::Family, Op, D>, crate::Error> {
        let request_id = self.request_id_generator.next();
        let request = wire::Message::GetCompactStateRequest(wire::GetCompactStateRequest {
            request_id,
            target,
        });
        let (tx, rx) = oneshot::channel();
        self.request_tx
            .clone()
            .send(io::Request {
                request,
                response_tx: tx,
            })
            .await
            .map_err(|_| crate::Error::RequestChannelClosed)?;
        let response = rx
            .await
            .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
        match response {
            wire::Message::GetCompactStateResponse(r) => Ok(r.state),
            wire::Message::Error(err) => Err(crate::Error::Server {
                code: err.error_code,
                message: err.message,
            }),
            _ => Err(crate::Error::UnexpectedResponse { request_id }),
        }
    }
}

impl<Op, D> sync::resolver::Resolver for Resolver<Op, D>
where
    Op: Clone + Read + EncodeShared,
    Op::Cfg: IsUnit,
    D: Digest,
{
    type Family = mmr::Family;
    type Digest = D;
    type Op = Op;
    type Error = crate::Error;

    async fn get_operations(
        &self,
        op_count: Location,
        start_loc: Location,
        max_ops: NonZeroU64,
        include_pinned_nodes: bool,
        _cancel_rx: oneshot::Receiver<()>,
    ) -> Result<sync::resolver::FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error>
    {
        fetch_operation_range(
            op_count,
            start_loc,
            max_ops,
            include_pinned_nodes,
            |op_count, start_loc, max_ops, include_pinned_nodes| async move {
                let request_id = self.request_id_generator.next();
                let request = wire::Message::GetOperationsRequest(wire::GetOperationsRequest {
                    request_id,
                    op_count,
                    start_loc,
                    max_ops,
                    include_pinned_nodes,
                });
                let (tx, rx) = oneshot::channel();
                self.request_tx
                    .clone()
                    .send(io::Request {
                        request,
                        response_tx: tx,
                    })
                    .await
                    .map_err(|_| crate::Error::RequestChannelClosed)?;
                let response = rx
                    .await
                    .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
                match response {
                    wire::Message::GetOperationsResponse(r) => {
                        Ok(sync::resolver::FetchedOperations::new(
                            r.proof,
                            r.operations,
                            r.pinned_nodes,
                        ))
                    }
                    wire::Message::Error(err) => Err(crate::Error::Server {
                        code: err.error_code,
                        message: err.message,
                    }),
                    _ => Err(crate::Error::UnexpectedResponse { request_id }),
                }
            },
        )
        .await
    }
}

impl<Op, D> compact::Resolver for Resolver<Op, D>
where
    Op: Clone + Read + EncodeShared,
    Op::Cfg: IsUnit,
    D: Digest,
{
    type Family = mmr::Family;
    type Digest = D;
    type Op = Op;
    type Error = crate::Error;

    async fn get_compact_state(
        &self,
        target: compact::Target<Self::Family, Self::Digest>,
    ) -> Result<compact::FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
        self.get_compact_state(target).await.map(Into::into)
    }
}