1use eyre::OptionExt;
4use ora_proto::snapshot::v1::{
5 snapshot_service_client::SnapshotServiceClient, ImportRequest, SnapshotData,
6};
7use prost::bytes::Buf;
8use tonic::transport::Channel;
9
10#[derive(Debug, Clone)]
12#[must_use]
13pub struct SnapshotClient {
14 client: SnapshotServiceClient<Channel>,
15}
16
17impl SnapshotClient {
18 pub fn new(client: SnapshotServiceClient<Channel>) -> Self {
20 Self { client }
21 }
22
23 pub async fn export_to_bytes(&self) -> eyre::Result<Vec<u8>> {
25 use prost::Message;
26
27 let mut response = self
28 .client
29 .clone()
30 .export(tonic::Request::new(
31 ora_proto::snapshot::v1::ExportRequest {},
32 ))
33 .await?
34 .into_inner();
35
36 let mut buf = Vec::new();
37
38 while let Some(chunk) = response.message().await? {
39 let data = chunk.data.ok_or_eyre("missing data")?;
40 data.encode_length_delimited(&mut buf)?;
41 }
42
43 Ok(buf)
44 }
45
46 pub async fn import_from_bytes(&self, buf: Vec<u8>) -> eyre::Result<()> {
48 use futures::stream::StreamExt;
49 use prost::Message;
50
51 self.client
52 .clone()
53 .import(tonic::Request::new(
54 async_stream::stream!({
55 let mut buf = std::io::Cursor::new(buf);
56
57 while buf.has_remaining() {
58 let data = match SnapshotData::decode_length_delimited(&mut buf) {
59 Ok(data) => data,
60 Err(error) => {
61 tracing::error!(?error, "failed to decode snapshot data");
62 return;
63 }
64 };
65
66 yield ImportRequest { data: Some(data) };
67 }
68 })
69 .boxed(),
70 ))
71 .await?;
72
73 Ok(())
74 }
75}