ora_client/
snapshot.rs

1//! High-level client for interacting with the Ora server snapshot endpoints.
2
3use eyre::OptionExt;
4use ora_proto::snapshot::v1::{
5    snapshot_service_client::SnapshotServiceClient, ImportRequest, SnapshotData,
6};
7use prost::bytes::Buf;
8use tonic::transport::Channel;
9
10/// A high-level client for interacting with the Ora server admin endpoints.
11#[derive(Debug, Clone)]
12#[must_use]
13pub struct SnapshotClient {
14    client: SnapshotServiceClient<Channel>,
15}
16
17impl SnapshotClient {
18    /// Create a new client from a gRPC client.
19    pub fn new(client: SnapshotServiceClient<Channel>) -> Self {
20        Self { client }
21    }
22
23    /// Export the current state of the server to a byte buffer.
24    pub async fn export_to_bytes(&self) -> eyre::Result<Vec<u8>> {
25        use prost::Message;
26
27        let mut response = self
28            .client
29            .clone()
30            .export(tonic::Request::new(
31                ora_proto::snapshot::v1::ExportRequest {},
32            ))
33            .await?
34            .into_inner();
35
36        let mut buf = Vec::new();
37
38        while let Some(chunk) = response.message().await? {
39            let data = chunk.data.ok_or_eyre("missing data")?;
40            data.encode_length_delimited(&mut buf)?;
41        }
42
43        Ok(buf)
44    }
45
46    /// Import a snapshot into the server from a byte buffer.
47    pub async fn import_from_bytes(&self, buf: Vec<u8>) -> eyre::Result<()> {
48        use futures::stream::StreamExt;
49        use prost::Message;
50
51        self.client
52            .clone()
53            .import(tonic::Request::new(
54                async_stream::stream!({
55                    let mut buf = std::io::Cursor::new(buf);
56
57                    while buf.has_remaining() {
58                        let data = match SnapshotData::decode_length_delimited(&mut buf) {
59                            Ok(data) => data,
60                            Err(error) => {
61                                tracing::error!(?error, "failed to decode snapshot data");
62                                return;
63                            }
64                        };
65
66                        yield ImportRequest { data: Some(data) };
67                    }
68                })
69                .boxed(),
70            ))
71            .await?;
72
73        Ok(())
74    }
75}