graft_client/
pagestore.rs

1use bytes::Bytes;
2use culprit::Culprit;
3use graft_core::lsn::LSN;
4use graft_core::{VolumeId, page_count::PageCount};
5use graft_proto::{
6    common::v1::SegmentInfo,
7    pagestore::v1::{
8        PageAtIdx, ReadPagesRequest, ReadPagesResponse, WritePagesRequest, WritePagesResponse,
9    },
10};
11use std::sync::atomic::{AtomicU32, Ordering};
12use url::Url;
13
14use crate::NetClient;
15use crate::{ClientErr, net::EndpointBuilder};
16
17#[derive(Debug)]
18pub struct PagestoreClient {
19    endpoint: EndpointBuilder,
20    client: NetClient,
21    pages_read_count: AtomicU32,
22}
23
24impl PagestoreClient {
25    pub fn new(root: Url, client: NetClient) -> Self {
26        Self {
27            endpoint: root.into(),
28            client,
29            pages_read_count: AtomicU32::new(0),
30        }
31    }
32
33    pub fn read_pages(
34        &self,
35        vid: &VolumeId,
36        lsn: LSN,
37        graft: Bytes,
38    ) -> Result<Vec<PageAtIdx>, Culprit<ClientErr>> {
39        let uri = self.endpoint.build("/pagestore/v1/read_pages")?;
40        let req = ReadPagesRequest {
41            vid: vid.copy_to_bytes(),
42            lsn: lsn.into(),
43            graft,
44        };
45        let result = self
46            .client
47            .send::<_, ReadPagesResponse>(uri, req)
48            .map(|r| r.pages);
49
50        // Increment the counter with the number of pages read
51        if let Ok(ref pages) = result {
52            self.pages_read_count
53                .fetch_add(pages.len() as u32, Ordering::Relaxed);
54        }
55
56        result
57    }
58
59    pub fn write_pages(
60        &self,
61        vid: &VolumeId,
62        pages: Vec<PageAtIdx>,
63    ) -> Result<Vec<SegmentInfo>, Culprit<ClientErr>> {
64        let uri = self.endpoint.build("/pagestore/v1/write_pages")?;
65        let req = WritePagesRequest { vid: vid.copy_to_bytes(), pages };
66        self.client
67            .send::<_, WritePagesResponse>(uri, req)
68            .map(|r| r.segments)
69    }
70
71    /// Returns the total number of pages read by this client.
72    pub fn pages_read(&self) -> PageCount {
73        PageCount::new(self.pages_read_count.load(Ordering::Relaxed))
74    }
75
76    /// Resets the pages read counter to zero.
77    pub fn reset_pages_read(&self) {
78        self.pages_read_count.store(0, Ordering::Relaxed);
79    }
80}
81
82impl Clone for PagestoreClient {
83    fn clone(&self) -> Self {
84        Self {
85            endpoint: self.endpoint.clone(),
86            client: self.client.clone(),
87            pages_read_count: AtomicU32::new(0), // New counter for each clone
88        }
89    }
90}