graft_client/
pagestore.rs1use bytes::Bytes;
2use culprit::Culprit;
3use graft_core::lsn::LSN;
4use graft_core::{VolumeId, page_count::PageCount};
5use graft_proto::{
6 common::v1::SegmentInfo,
7 pagestore::v1::{
8 PageAtIdx, ReadPagesRequest, ReadPagesResponse, WritePagesRequest, WritePagesResponse,
9 },
10};
11use std::sync::atomic::{AtomicU32, Ordering};
12use url::Url;
13
14use crate::NetClient;
15use crate::{ClientErr, net::EndpointBuilder};
16
17#[derive(Debug)]
18pub struct PagestoreClient {
19 endpoint: EndpointBuilder,
20 client: NetClient,
21 pages_read_count: AtomicU32,
22}
23
24impl PagestoreClient {
25 pub fn new(root: Url, client: NetClient) -> Self {
26 Self {
27 endpoint: root.into(),
28 client,
29 pages_read_count: AtomicU32::new(0),
30 }
31 }
32
33 pub fn read_pages(
34 &self,
35 vid: &VolumeId,
36 lsn: LSN,
37 graft: Bytes,
38 ) -> Result<Vec<PageAtIdx>, Culprit<ClientErr>> {
39 let uri = self.endpoint.build("/pagestore/v1/read_pages")?;
40 let req = ReadPagesRequest {
41 vid: vid.copy_to_bytes(),
42 lsn: lsn.into(),
43 graft,
44 };
45 let result = self
46 .client
47 .send::<_, ReadPagesResponse>(uri, req)
48 .map(|r| r.pages);
49
50 if let Ok(ref pages) = result {
52 self.pages_read_count
53 .fetch_add(pages.len() as u32, Ordering::Relaxed);
54 }
55
56 result
57 }
58
59 pub fn write_pages(
60 &self,
61 vid: &VolumeId,
62 pages: Vec<PageAtIdx>,
63 ) -> Result<Vec<SegmentInfo>, Culprit<ClientErr>> {
64 let uri = self.endpoint.build("/pagestore/v1/write_pages")?;
65 let req = WritePagesRequest { vid: vid.copy_to_bytes(), pages };
66 self.client
67 .send::<_, WritePagesResponse>(uri, req)
68 .map(|r| r.segments)
69 }
70
71 pub fn pages_read(&self) -> PageCount {
73 PageCount::new(self.pages_read_count.load(Ordering::Relaxed))
74 }
75
76 pub fn reset_pages_read(&self) {
78 self.pages_read_count.store(0, Ordering::Relaxed);
79 }
80}
81
82impl Clone for PagestoreClient {
83 fn clone(&self) -> Self {
84 Self {
85 endpoint: self.endpoint.clone(),
86 client: self.client.clone(),
87 pages_read_count: AtomicU32::new(0), }
89 }
90}