1use std::{future, ops::Range, path::PathBuf};
2
3use bilrost::{Message, OwnedMessage};
4use bytes::Bytes;
5use culprit::ResultExt;
6use futures::{
7 Stream, StreamExt, TryStreamExt,
8 stream::{self, FuturesOrdered},
9};
10use graft_core::{
11 SegmentId, VolumeId,
12 cbe::CBE64,
13 checkpoints::{CachedCheckpoints, Checkpoints},
14 commit::Commit,
15 lsn::LSN,
16};
17use object_store::{
18 GetOptions, GetRange, ObjectStore, PutOptions, PutPayload, aws::S3ConditionalPut,
19 local::LocalFileSystem, memory::InMemory, path::Path, prefix::PrefixStore,
20};
21use serde::{Deserialize, Serialize};
22use thiserror::Error;
23
24pub mod segment;
25
26const FETCH_COMMITS_CONCURRENCY: usize = 5;
27
28enum RemotePath<'a> {
29 CheckpointSet(&'a VolumeId),
31
32 Commit(&'a VolumeId, LSN),
34
35 Segment(&'a SegmentId),
37}
38
39impl RemotePath<'_> {
40 fn build(self) -> object_store::path::Path {
41 match self {
42 Self::CheckpointSet(vid) => {
43 Path::from_iter(["volumes", &vid.serialize(), "checkpoints"])
44 }
45 Self::Commit(vid, lsn) => Path::from_iter([
46 "volumes",
47 &vid.serialize(),
48 "log",
49 &CBE64::from(lsn).to_string(),
50 ]),
51 Self::Segment(sid) => Path::from_iter(["segments", &sid.serialize()]),
52 }
53 }
54}
55
56#[derive(Error, Debug)]
57pub enum RemoteErr {
58 #[error("Object store error: {0}")]
59 ObjectStore(#[from] object_store::Error),
60
61 #[error("Invalid path: {0}")]
62 Path(#[from] object_store::path::Error),
63
64 #[error("Failed to decode file: {0}")]
65 Decode(#[from] bilrost::DecodeError),
66}
67
68impl RemoteErr {
69 pub fn is_already_exists(&self) -> bool {
70 matches!(
71 self,
72 Self::ObjectStore(object_store::Error::AlreadyExists { .. })
73 )
74 }
75
76 pub fn is_not_found(&self) -> bool {
77 matches!(
78 self,
79 Self::ObjectStore(object_store::Error::NotFound { .. })
80 )
81 }
82
83 pub fn is_not_modified(&self) -> bool {
84 matches!(
85 self,
86 Self::ObjectStore(object_store::Error::NotModified { .. })
87 )
88 }
89}
90
91pub type Result<T> = culprit::Result<T, RemoteErr>;
92
93#[derive(Debug, Deserialize, Serialize, Default, Clone)]
94#[serde(tag = "type", rename_all = "snake_case")]
95pub enum RemoteConfig {
96 #[default]
98 Memory,
99
100 Fs { root: PathBuf },
102
103 S3Compatible {
107 bucket: String,
108 prefix: Option<String>,
109 },
110}
111
112impl RemoteConfig {
113 pub fn build(self) -> Result<Remote> {
114 Remote::with_config(self)
115 }
116}
117
118#[derive(Debug)]
119pub struct Remote {
120 store: Box<dyn ObjectStore>,
121}
122
123impl Remote {
124 pub fn with_config(config: RemoteConfig) -> Result<Self> {
125 let store: Box<dyn ObjectStore> = match config {
126 RemoteConfig::Memory => Box::new(InMemory::new()),
127 RemoteConfig::Fs { root } => Box::new(LocalFileSystem::new_with_prefix(root)?),
128 RemoteConfig::S3Compatible { bucket, prefix } => {
129 let store = object_store::aws::AmazonS3Builder::from_env()
130 .with_allow_http(true)
131 .with_bucket_name(bucket)
132 .with_conditional_put(S3ConditionalPut::ETagMatch)
133 .build()?;
134 if let Some(prefix) = prefix {
135 let prefix = Path::parse(prefix)?;
136 Box::new(PrefixStore::new(store, prefix))
137 } else {
138 Box::new(store)
139 }
140 }
141 };
142
143 Ok(Self { store })
144 }
145
146 #[tracing::instrument(level = "trace", skip(self))]
149 pub async fn get_checkpoints(
150 &self,
151 vid: &VolumeId,
152 etag: Option<String>,
153 ) -> Result<CachedCheckpoints> {
154 let path = RemotePath::CheckpointSet(vid).build();
155 let opts = GetOptions {
156 if_none_match: etag,
157 ..GetOptions::default()
158 };
159
160 let result = self.store.get_opts(&path, opts).await?;
161 let etag = result.meta.e_tag.clone();
162 let bytes = result.bytes().await?;
163
164 Ok(CachedCheckpoints::new(Checkpoints::decode(bytes)?, etag))
165 }
166
167 pub fn stream_commits_ordered<I: IntoIterator<Item = LSN>>(
172 &self,
173 vid: &VolumeId,
174 lsns: I,
175 ) -> impl Stream<Item = Result<Commit>> {
176 let mut lsns = lsns.into_iter();
180 let first_chunk: Vec<LSN> = match lsns.next() {
181 Some(lsn) => vec![lsn],
182 None => vec![],
183 };
184 stream::once(future::ready(first_chunk))
185 .chain(stream::iter(lsns).chunks(FETCH_COMMITS_CONCURRENCY))
186 .flat_map(|chunk| {
187 chunk
188 .into_iter()
189 .map(|lsn| self.get_commit(vid, lsn))
190 .collect::<FuturesOrdered<_>>()
191 })
192 .try_take_while(|result| future::ready(Ok(result.is_some())))
193 .map_ok(|result| result.unwrap())
194 }
195
196 #[tracing::instrument(level = "trace", skip(self, lsn), fields(lsn = %lsn))]
198 pub async fn get_commit(&self, vid: &VolumeId, lsn: LSN) -> Result<Option<Commit>> {
199 let path = RemotePath::Commit(vid, lsn).build();
200 match self.store.get(&path).await {
201 Ok(res) => Commit::decode(res.bytes().await?).or_into_ctx().map(Some),
202 Err(object_store::Error::NotFound { .. }) => Ok(None),
203 Err(err) => Err(err.into()),
204 }
205 }
206
207 #[tracing::instrument(level = "debug", skip(self, commit), fields(lsn = %commit.lsn()))]
210 pub async fn put_commit(&self, commit: &Commit) -> Result<()> {
211 let path = RemotePath::Commit(commit.vid(), commit.lsn()).build();
212 let payload = PutPayload::from_bytes(commit.encode_to_bytes());
213 self.store
214 .put_opts(
215 &path,
216 payload,
217 PutOptions {
218 mode: object_store::PutMode::Create,
221 ..PutOptions::default()
222 },
223 )
224 .await?;
225 Ok(())
226 }
227
228 #[tracing::instrument(level = "debug", skip(self, chunks), fields(size))]
230 pub async fn put_segment<I: IntoIterator<Item = Bytes>>(
231 &self,
232 sid: &SegmentId,
233 chunks: I,
234 ) -> Result<()> {
235 let path = RemotePath::Segment(sid).build();
236 let payload = PutPayload::from_iter(chunks);
237 tracing::Span::current().record("size", payload.content_length());
238 self.store.put(&path, payload).await?;
239 Ok(())
240 }
241
242 #[tracing::instrument(level = "debug", skip(self))]
244 pub async fn get_segment_range(&self, sid: &SegmentId, bytes: &Range<usize>) -> Result<Bytes> {
245 let path = RemotePath::Segment(sid).build();
246 let get_opts = GetOptions {
247 range: Some(GetRange::Bounded(bytes.start as u64..bytes.end as u64)),
248 ..GetOptions::default()
249 };
250 let result = self.store.get_opts(&path, get_opts).await?;
251 Ok(result.bytes().await?)
252 }
253
254 #[cfg(test)]
256 pub async fn testonly_format_tree(&self) -> String {
257 use itertools::Itertools;
258 use std::collections::BTreeMap;
259 use text_trees::{
260 AnchorPosition, FormatCharacters, TreeFormatting, TreeNode, TreeOrientation,
261 };
262
263 let paths = self.store.list(None).map_ok(|obj| {
264 obj.location
265 .parts()
266 .map(|p| p.as_ref().to_string())
267 .collect_vec()
268 });
269 let paths: Vec<_> = paths.try_collect().await.unwrap();
270
271 #[derive(Default)]
272 struct TreeBuilder {
273 children: BTreeMap<String, TreeBuilder>,
274 }
275
276 impl TreeBuilder {
277 fn insert(&mut self, parts: &[String]) {
278 if parts.is_empty() {
279 return;
280 }
281
282 let first = &parts[0];
283 let rest = &parts[1..];
284
285 self.children.entry(first.clone()).or_default().insert(rest);
286 }
287
288 fn to_tree_node(self, name: String) -> TreeNode<String> {
289 if self.children.is_empty() {
290 TreeNode::new(name)
292 } else {
293 let child_nodes = self
295 .children
296 .into_iter()
297 .map(|(name, builder)| builder.to_tree_node(name));
298 TreeNode::with_child_nodes(name, child_nodes)
299 }
300 }
301 }
302
303 let mut root = TreeBuilder::default();
304 for path in paths {
305 root.insert(&path);
306 }
307
308 root.to_tree_node(self.store.to_string())
309 .to_string_with_format(&TreeFormatting {
310 prefix_str: None,
311 orientation: TreeOrientation::TopDown,
312 anchor: AnchorPosition::Left,
313 chars: FormatCharacters::box_chars(),
314 })
315 .unwrap()
316 .to_string()
317 }
318}