graft_client/
metastore.rs

1use bytes::Bytes;
2use culprit::{Culprit, ResultExt};
3use graft_core::{VolumeId, gid::ClientId, lsn::LSN, page_count::PageCount};
4use graft_proto::{
5    common::v1::{Commit, LsnRange, SegmentInfo, Snapshot},
6    metastore::v1::{
7        CommitRequest, CommitResponse, PullCommitsRequest, PullCommitsResponse, PullGraftRequest,
8        PullGraftResponse, SnapshotRequest, SnapshotResponse,
9    },
10};
11use splinter_rs::SplinterRef;
12use std::ops::RangeBounds;
13use url::Url;
14
15use crate::NetClient;
16use crate::{error, net::EndpointBuilder};
17
18#[derive(Debug, Clone)]
19pub struct MetastoreClient {
20    endpoint: EndpointBuilder,
21    client: NetClient,
22}
23
24impl MetastoreClient {
25    pub fn new(root: Url, client: NetClient) -> Self {
26        Self { endpoint: root.into(), client }
27    }
28
29    pub fn snapshot(
30        &self,
31        vid: &VolumeId,
32        lsn: Option<LSN>,
33    ) -> Result<Option<Snapshot>, Culprit<error::ClientErr>> {
34        let uri = self.endpoint.build("/metastore/v1/snapshot")?;
35        let req = SnapshotRequest {
36            vid: vid.copy_to_bytes(),
37            lsn: lsn.map(Into::into),
38        };
39        match self.client.send::<_, SnapshotResponse>(uri, req) {
40            Ok(resp) => Ok(resp.snapshot),
41            Err(err) if err.ctx().is_snapshot_missing() => Ok(None),
42            Err(err) => Err(err),
43        }
44    }
45
46    #[allow(clippy::type_complexity)]
47    pub fn pull_graft<R: RangeBounds<LSN>>(
48        &self,
49        vid: &VolumeId,
50        range: R,
51    ) -> Result<Option<(Snapshot, LsnRange, SplinterRef<Bytes>)>, Culprit<error::ClientErr>> {
52        let uri = self.endpoint.build("/metastore/v1/pull_graft")?;
53        let req = PullGraftRequest {
54            vid: vid.copy_to_bytes(),
55            range: Some(LsnRange::from_range(range)),
56        };
57        match self.client.send::<_, PullGraftResponse>(uri, req) {
58            Ok(resp) => {
59                let snapshot = resp.snapshot.expect("snapshot is missing");
60                let range = resp.range.expect("range is missing");
61                let graft = SplinterRef::from_bytes(resp.graft).or_into_ctx()?;
62                Ok(Some((snapshot, range, graft)))
63            }
64            Err(err) if err.ctx().is_snapshot_missing() => Ok(None),
65            Err(err) => Err(err),
66        }
67    }
68
69    pub fn pull_commits<R>(
70        &self,
71        vid: &VolumeId,
72        range: R,
73    ) -> Result<Vec<Commit>, Culprit<error::ClientErr>>
74    where
75        R: RangeBounds<LSN>,
76    {
77        let uri = self.endpoint.build("/metastore/v1/pull_commits")?;
78        let req = PullCommitsRequest {
79            vid: vid.copy_to_bytes(),
80            range: Some(LsnRange::from_range(range)),
81        };
82        self.client
83            .send::<_, PullCommitsResponse>(uri, req)
84            .map(|resp| resp.commits)
85    }
86
87    pub fn commit(
88        &self,
89        vid: &VolumeId,
90        cid: &ClientId,
91        snapshot_lsn: Option<LSN>,
92        page_count: PageCount,
93        segments: Vec<SegmentInfo>,
94    ) -> Result<Snapshot, Culprit<error::ClientErr>> {
95        let uri = self.endpoint.build("/metastore/v1/commit")?;
96        let req = CommitRequest {
97            vid: vid.copy_to_bytes(),
98            cid: cid.copy_to_bytes(),
99            snapshot_lsn: snapshot_lsn.map(Into::into),
100            page_count: page_count.into(),
101            segments,
102        };
103        self.client
104            .send::<_, CommitResponse>(uri, req)
105            .map(|r| r.snapshot.expect("missing snapshot after commit"))
106    }
107}