graft_client/
metastore.rs1use bytes::Bytes;
2use culprit::{Culprit, ResultExt};
3use graft_core::{VolumeId, gid::ClientId, lsn::LSN, page_count::PageCount};
4use graft_proto::{
5 common::v1::{Commit, LsnRange, SegmentInfo, Snapshot},
6 metastore::v1::{
7 CommitRequest, CommitResponse, PullCommitsRequest, PullCommitsResponse, PullGraftRequest,
8 PullGraftResponse, SnapshotRequest, SnapshotResponse,
9 },
10};
11use splinter_rs::SplinterRef;
12use std::ops::RangeBounds;
13use url::Url;
14
15use crate::NetClient;
16use crate::{error, net::EndpointBuilder};
17
18#[derive(Debug, Clone)]
19pub struct MetastoreClient {
20 endpoint: EndpointBuilder,
21 client: NetClient,
22}
23
24impl MetastoreClient {
25 pub fn new(root: Url, client: NetClient) -> Self {
26 Self { endpoint: root.into(), client }
27 }
28
29 pub fn snapshot(
30 &self,
31 vid: &VolumeId,
32 lsn: Option<LSN>,
33 ) -> Result<Option<Snapshot>, Culprit<error::ClientErr>> {
34 let uri = self.endpoint.build("/metastore/v1/snapshot")?;
35 let req = SnapshotRequest {
36 vid: vid.copy_to_bytes(),
37 lsn: lsn.map(Into::into),
38 };
39 match self.client.send::<_, SnapshotResponse>(uri, req) {
40 Ok(resp) => Ok(resp.snapshot),
41 Err(err) if err.ctx().is_snapshot_missing() => Ok(None),
42 Err(err) => Err(err),
43 }
44 }
45
46 #[allow(clippy::type_complexity)]
47 pub fn pull_graft<R: RangeBounds<LSN>>(
48 &self,
49 vid: &VolumeId,
50 range: R,
51 ) -> Result<Option<(Snapshot, LsnRange, SplinterRef<Bytes>)>, Culprit<error::ClientErr>> {
52 let uri = self.endpoint.build("/metastore/v1/pull_graft")?;
53 let req = PullGraftRequest {
54 vid: vid.copy_to_bytes(),
55 range: Some(LsnRange::from_range(range)),
56 };
57 match self.client.send::<_, PullGraftResponse>(uri, req) {
58 Ok(resp) => {
59 let snapshot = resp.snapshot.expect("snapshot is missing");
60 let range = resp.range.expect("range is missing");
61 let graft = SplinterRef::from_bytes(resp.graft).or_into_ctx()?;
62 Ok(Some((snapshot, range, graft)))
63 }
64 Err(err) if err.ctx().is_snapshot_missing() => Ok(None),
65 Err(err) => Err(err),
66 }
67 }
68
69 pub fn pull_commits<R>(
70 &self,
71 vid: &VolumeId,
72 range: R,
73 ) -> Result<Vec<Commit>, Culprit<error::ClientErr>>
74 where
75 R: RangeBounds<LSN>,
76 {
77 let uri = self.endpoint.build("/metastore/v1/pull_commits")?;
78 let req = PullCommitsRequest {
79 vid: vid.copy_to_bytes(),
80 range: Some(LsnRange::from_range(range)),
81 };
82 self.client
83 .send::<_, PullCommitsResponse>(uri, req)
84 .map(|resp| resp.commits)
85 }
86
87 pub fn commit(
88 &self,
89 vid: &VolumeId,
90 cid: &ClientId,
91 snapshot_lsn: Option<LSN>,
92 page_count: PageCount,
93 segments: Vec<SegmentInfo>,
94 ) -> Result<Snapshot, Culprit<error::ClientErr>> {
95 let uri = self.endpoint.build("/metastore/v1/commit")?;
96 let req = CommitRequest {
97 vid: vid.copy_to_bytes(),
98 cid: cid.copy_to_bytes(),
99 snapshot_lsn: snapshot_lsn.map(Into::into),
100 page_count: page_count.into(),
101 segments,
102 };
103 self.client
104 .send::<_, CommitResponse>(uri, req)
105 .map(|r| r.snapshot.expect("missing snapshot after commit"))
106 }
107}