graft_kernel/
remote.rs

1use std::{future, ops::Range, path::PathBuf};
2
3use bilrost::{Message, OwnedMessage};
4use bytes::Bytes;
5use culprit::ResultExt;
6use futures::{
7    Stream, StreamExt, TryStreamExt,
8    stream::{self, FuturesOrdered},
9};
10use graft_core::{
11    SegmentId, VolumeId,
12    cbe::CBE64,
13    checkpoints::{CachedCheckpoints, Checkpoints},
14    commit::Commit,
15    lsn::LSN,
16};
17use object_store::{
18    GetOptions, GetRange, ObjectStore, PutOptions, PutPayload, aws::S3ConditionalPut,
19    local::LocalFileSystem, memory::InMemory, path::Path, prefix::PrefixStore,
20};
21use serde::{Deserialize, Serialize};
22use thiserror::Error;
23
24pub mod segment;
25
26const FETCH_COMMITS_CONCURRENCY: usize = 5;
27
28enum RemotePath<'a> {
29    /// `CheckpointSets` are stored at `/volumes/{vid}/checkpoints`
30    CheckpointSet(&'a VolumeId),
31
32    /// Commits are stored at `/volumes/{vid}/log/{CBE64 hex LSN}`
33    Commit(&'a VolumeId, LSN),
34
35    /// Segments are stored at `/segments/{sid}`
36    Segment(&'a SegmentId),
37}
38
39impl RemotePath<'_> {
40    fn build(self) -> object_store::path::Path {
41        match self {
42            Self::CheckpointSet(vid) => {
43                Path::from_iter(["volumes", &vid.serialize(), "checkpoints"])
44            }
45            Self::Commit(vid, lsn) => Path::from_iter([
46                "volumes",
47                &vid.serialize(),
48                "log",
49                &CBE64::from(lsn).to_string(),
50            ]),
51            Self::Segment(sid) => Path::from_iter(["segments", &sid.serialize()]),
52        }
53    }
54}
55
56#[derive(Error, Debug)]
57pub enum RemoteErr {
58    #[error("Object store error: {0}")]
59    ObjectStore(#[from] object_store::Error),
60
61    #[error("Invalid path: {0}")]
62    Path(#[from] object_store::path::Error),
63
64    #[error("Failed to decode file: {0}")]
65    Decode(#[from] bilrost::DecodeError),
66}
67
68impl RemoteErr {
69    pub fn is_already_exists(&self) -> bool {
70        matches!(
71            self,
72            Self::ObjectStore(object_store::Error::AlreadyExists { .. })
73        )
74    }
75
76    pub fn is_not_found(&self) -> bool {
77        matches!(
78            self,
79            Self::ObjectStore(object_store::Error::NotFound { .. })
80        )
81    }
82
83    pub fn is_not_modified(&self) -> bool {
84        matches!(
85            self,
86            Self::ObjectStore(object_store::Error::NotModified { .. })
87        )
88    }
89}
90
91pub type Result<T> = culprit::Result<T, RemoteErr>;
92
93#[derive(Debug, Deserialize, Serialize, Default, Clone)]
94#[serde(tag = "type", rename_all = "snake_case")]
95pub enum RemoteConfig {
96    /// In memory object store
97    #[default]
98    Memory,
99
100    /// On disk object store
101    Fs { root: PathBuf },
102
103    /// S3 compatible object store
104    /// Can load most config and secrets from environment variables
105    /// See `object_store::aws::builder::AmazonS3Builder` for env variable names
106    S3Compatible {
107        bucket: String,
108        prefix: Option<String>,
109    },
110}
111
112impl RemoteConfig {
113    pub fn build(self) -> Result<Remote> {
114        Remote::with_config(self)
115    }
116}
117
118#[derive(Debug)]
119pub struct Remote {
120    store: Box<dyn ObjectStore>,
121}
122
123impl Remote {
124    pub fn with_config(config: RemoteConfig) -> Result<Self> {
125        let store: Box<dyn ObjectStore> = match config {
126            RemoteConfig::Memory => Box::new(InMemory::new()),
127            RemoteConfig::Fs { root } => Box::new(LocalFileSystem::new_with_prefix(root)?),
128            RemoteConfig::S3Compatible { bucket, prefix } => {
129                let store = object_store::aws::AmazonS3Builder::from_env()
130                    .with_allow_http(true)
131                    .with_bucket_name(bucket)
132                    .with_conditional_put(S3ConditionalPut::ETagMatch)
133                    .build()?;
134                if let Some(prefix) = prefix {
135                    let prefix = Path::parse(prefix)?;
136                    Box::new(PrefixStore::new(store, prefix))
137                } else {
138                    Box::new(store)
139                }
140            }
141        };
142
143        Ok(Self { store })
144    }
145
146    /// Fetches checkpoints for the specified volume. If `etag` is not `None`
147    /// then this method will return a not modified error.
148    #[tracing::instrument(level = "trace", skip(self))]
149    pub async fn get_checkpoints(
150        &self,
151        vid: &VolumeId,
152        etag: Option<String>,
153    ) -> Result<CachedCheckpoints> {
154        let path = RemotePath::CheckpointSet(vid).build();
155        let opts = GetOptions {
156            if_none_match: etag,
157            ..GetOptions::default()
158        };
159
160        let result = self.store.get_opts(&path, opts).await?;
161        let etag = result.meta.e_tag.clone();
162        let bytes = result.bytes().await?;
163
164        Ok(CachedCheckpoints::new(Checkpoints::decode(bytes)?, etag))
165    }
166
167    /// Streams commits by LSN in the same order as the input iterator.
168    /// Stops fetching commits as soon as we receive a `NotFound` error from the
169    /// remote, thus even if `lsns` contains every LSN we will stop loading
170    /// commits as soon as we reach the end of the log.
171    pub fn stream_commits_ordered<I: IntoIterator<Item = LSN>>(
172        &self,
173        vid: &VolumeId,
174        lsns: I,
175    ) -> impl Stream<Item = Result<Commit>> {
176        // convert the set into a stream of chunks, such that the first chunk
177        // only contains the first LSN, and the remaining chunks have a maximum
178        // size of REPLAY_CONCURRENCY
179        let mut lsns = lsns.into_iter();
180        let first_chunk: Vec<LSN> = match lsns.next() {
181            Some(lsn) => vec![lsn],
182            None => vec![],
183        };
184        stream::once(future::ready(first_chunk))
185            .chain(stream::iter(lsns).chunks(FETCH_COMMITS_CONCURRENCY))
186            .flat_map(|chunk| {
187                chunk
188                    .into_iter()
189                    .map(|lsn| self.get_commit(vid, lsn))
190                    .collect::<FuturesOrdered<_>>()
191            })
192            .try_take_while(|result| future::ready(Ok(result.is_some())))
193            .map_ok(|result| result.unwrap())
194    }
195
196    /// Fetches a single commit, returning None if the commit is not found.
197    #[tracing::instrument(level = "trace", skip(self, lsn), fields(lsn = %lsn))]
198    pub async fn get_commit(&self, vid: &VolumeId, lsn: LSN) -> Result<Option<Commit>> {
199        let path = RemotePath::Commit(vid, lsn).build();
200        match self.store.get(&path).await {
201            Ok(res) => Commit::decode(res.bytes().await?).or_into_ctx().map(Some),
202            Err(object_store::Error::NotFound { .. }) => Ok(None),
203            Err(err) => Err(err.into()),
204        }
205    }
206
207    /// Atomically write a commit to the remote, returning
208    /// `RemoteErr::ObjectStore(Error::AlreadyExists)` on a collision
209    #[tracing::instrument(level = "debug", skip(self, commit), fields(lsn = %commit.lsn()))]
210    pub async fn put_commit(&self, commit: &Commit) -> Result<()> {
211        let path = RemotePath::Commit(commit.vid(), commit.lsn()).build();
212        let payload = PutPayload::from_bytes(commit.encode_to_bytes());
213        self.store
214            .put_opts(
215                &path,
216                payload,
217                PutOptions {
218                    // Perform an atomic write operation, returning
219                    // AlreadyExists if the commit already exists
220                    mode: object_store::PutMode::Create,
221                    ..PutOptions::default()
222                },
223            )
224            .await?;
225        Ok(())
226    }
227
228    /// Uploads a segment to this Remote
229    #[tracing::instrument(level = "debug", skip(self, chunks), fields(size))]
230    pub async fn put_segment<I: IntoIterator<Item = Bytes>>(
231        &self,
232        sid: &SegmentId,
233        chunks: I,
234    ) -> Result<()> {
235        let path = RemotePath::Segment(sid).build();
236        let payload = PutPayload::from_iter(chunks);
237        tracing::Span::current().record("size", payload.content_length());
238        self.store.put(&path, payload).await?;
239        Ok(())
240    }
241
242    /// Reads a byte range of a segment
243    #[tracing::instrument(level = "debug", skip(self))]
244    pub async fn get_segment_range(&self, sid: &SegmentId, bytes: &Range<usize>) -> Result<Bytes> {
245        let path = RemotePath::Segment(sid).build();
246        let get_opts = GetOptions {
247            range: Some(GetRange::Bounded(bytes.start as u64..bytes.end as u64)),
248            ..GetOptions::default()
249        };
250        let result = self.store.get_opts(&path, get_opts).await?;
251        Ok(result.bytes().await?)
252    }
253
254    /// TESTONLY: list contents of this remote in a tree-like format
255    #[cfg(test)]
256    pub async fn testonly_format_tree(&self) -> String {
257        use itertools::Itertools;
258        use std::collections::BTreeMap;
259        use text_trees::{
260            AnchorPosition, FormatCharacters, TreeFormatting, TreeNode, TreeOrientation,
261        };
262
263        let paths = self.store.list(None).map_ok(|obj| {
264            obj.location
265                .parts()
266                .map(|p| p.as_ref().to_string())
267                .collect_vec()
268        });
269        let paths: Vec<_> = paths.try_collect().await.unwrap();
270
271        #[derive(Default)]
272        struct TreeBuilder {
273            children: BTreeMap<String, TreeBuilder>,
274        }
275
276        impl TreeBuilder {
277            fn insert(&mut self, parts: &[String]) {
278                if parts.is_empty() {
279                    return;
280                }
281
282                let first = &parts[0];
283                let rest = &parts[1..];
284
285                self.children.entry(first.clone()).or_default().insert(rest);
286            }
287
288            fn to_tree_node(self, name: String) -> TreeNode<String> {
289                if self.children.is_empty() {
290                    // This is a leaf node
291                    TreeNode::new(name)
292                } else {
293                    // This is a directory node
294                    let child_nodes = self
295                        .children
296                        .into_iter()
297                        .map(|(name, builder)| builder.to_tree_node(name));
298                    TreeNode::with_child_nodes(name, child_nodes)
299                }
300            }
301        }
302
303        let mut root = TreeBuilder::default();
304        for path in paths {
305            root.insert(&path);
306        }
307
308        root.to_tree_node(self.store.to_string())
309            .to_string_with_format(&TreeFormatting {
310                prefix_str: None,
311                orientation: TreeOrientation::TopDown,
312                anchor: AnchorPosition::Left,
313                chars: FormatCharacters::box_chars(),
314            })
315            .unwrap()
316            .to_string()
317    }
318}