libsql_wal/storage/backend/
s3.rs

1//! S3 implementation of storage backend
2
3use std::fmt::{self, Formatter};
4use std::mem::size_of;
5use std::path::Path;
6use std::pin::Pin;
7use std::str::FromStr;
8use std::sync::Arc;
9use std::task::Poll;
10
11use aws_config::SdkConfig;
12use aws_sdk_s3::operation::create_bucket::CreateBucketError;
13use aws_sdk_s3::operation::get_object::GetObjectOutput;
14use aws_sdk_s3::primitives::{ByteStream, SdkBody};
15use aws_sdk_s3::types::{CreateBucketConfiguration, Object};
16use aws_sdk_s3::Client;
17use bytes::{Bytes, BytesMut};
18use chrono::{DateTime, Utc};
19use http_body::{Frame as HttpFrame, SizeHint};
20use libsql_sys::name::NamespaceName;
21use roaring::RoaringBitmap;
22use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader};
23use tokio_stream::Stream;
24use tokio_util::sync::ReusableBoxFuture;
25use zerocopy::byteorder::little_endian::{U16 as lu16, U32 as lu32, U64 as lu64};
26use zerocopy::{AsBytes, FromBytes, FromZeroes};
27
28use super::{Backend, FindSegmentReq, SegmentMeta};
29use crate::io::buf::ZeroCopyBuf;
30use crate::io::compat::copy_to_file;
31use crate::io::{FileExt, Io, StdIO};
32use crate::segment::compacted::CompactedSegmentDataHeader;
33use crate::segment::Frame;
34use crate::storage::{Error, RestoreOptions, Result, SegmentInfo, SegmentKey};
35use crate::LIBSQL_MAGIC;
36
37pub struct S3Backend<IO> {
38    client: Client,
39    default_config: Arc<S3Config>,
40    io: IO,
41}
42impl S3Backend<StdIO> {
43    pub async fn from_sdk_config(
44        aws_config: SdkConfig,
45        bucket: String,
46        cluster_id: String,
47    ) -> Result<S3Backend<StdIO>> {
48        Self::from_sdk_config_with_io(aws_config, bucket, cluster_id, StdIO(())).await
49    }
50}
51
52/// Header for segment index stored into s3
53#[repr(C)]
54#[derive(Copy, Clone, Debug, AsBytes, FromZeroes, FromBytes)]
55struct SegmentIndexHeader {
56    magic: lu64,
57    version: lu16,
58    len: lu64,
59    checksum: lu32,
60}
61
62impl<IO: Io> S3Backend<IO> {
63    #[doc(hidden)]
64    pub async fn from_sdk_config_with_io(
65        aws_config: SdkConfig,
66        bucket: String,
67        cluster_id: String,
68        io: IO,
69    ) -> Result<Self> {
70        let config = aws_sdk_s3::Config::new(&aws_config)
71            .to_builder()
72            .force_path_style(true)
73            .build();
74
75        let region = config.region().expect("region must be configured").clone();
76
77        let client = Client::from_conf(config);
78        let config = S3Config {
79            bucket,
80            cluster_id,
81            aws_config,
82        };
83
84        let bucket_config = CreateBucketConfiguration::builder()
85            .location_constraint(
86                aws_sdk_s3::types::BucketLocationConstraint::from_str(&region.to_string()).unwrap(),
87            )
88            .build();
89
90        // TODO: we may need to create the bucket for config overrides. Maybe try lazy bucket
91        // creation? or assume that the bucket exists?
92        let create_bucket_ret = client
93            .create_bucket()
94            .create_bucket_configuration(bucket_config)
95            .bucket(&config.bucket)
96            .send()
97            .await;
98
99        match create_bucket_ret {
100            Ok(_) => (),
101            Err(e) => {
102                if let Some(service_error) = e.as_service_error() {
103                    match service_error {
104                        CreateBucketError::BucketAlreadyExists(_)
105                        | CreateBucketError::BucketAlreadyOwnedByYou(_) => {
106                            tracing::debug!("bucket already exist");
107                        }
108                        _ => return Err(Error::unhandled(e, "failed to create bucket")),
109                    }
110                } else {
111                    return Err(Error::unhandled(e, "failed to create bucket"));
112                }
113            }
114        }
115
116        Ok(Self {
117            client,
118            default_config: config.into(),
119            io,
120        })
121    }
122
123    async fn fetch_segment_data_reader(
124        &self,
125        config: &S3Config,
126        folder_key: &FolderKey<'_>,
127        segment_key: &SegmentKey,
128    ) -> Result<impl AsyncRead> {
129        let key = s3_segment_data_key(folder_key, segment_key);
130        let stream = self.s3_get(config, key).await?;
131        Ok(stream.body.into_async_read())
132    }
133
134    async fn fetch_segment_data_inner(
135        &self,
136        config: &S3Config,
137        folder_key: &FolderKey<'_>,
138        segment_key: &SegmentKey,
139        file: &impl FileExt,
140    ) -> Result<CompactedSegmentDataHeader> {
141        let reader = self
142            .fetch_segment_data_reader(config, folder_key, segment_key)
143            .await?;
144        let mut reader = tokio::io::BufReader::with_capacity(8196, reader);
145        while reader.fill_buf().await?.len() < size_of::<CompactedSegmentDataHeader>() {}
146        let header = CompactedSegmentDataHeader::read_from_prefix(reader.buffer()).unwrap();
147
148        copy_to_file(reader, file).await?;
149
150        Ok(header)
151    }
152
153    async fn s3_get(&self, config: &S3Config, key: impl ToString) -> Result<GetObjectOutput> {
154        Ok(self
155            .client
156            .get_object()
157            .bucket(&config.bucket)
158            .key(key.to_string())
159            .send()
160            .await
161            .map_err(|e| Error::unhandled(e, "error sending s3 GET request"))?)
162    }
163
164    async fn s3_put(&self, config: &S3Config, key: impl ToString, body: ByteStream) -> Result<()> {
165        self.client
166            .put_object()
167            .bucket(&config.bucket)
168            .body(body)
169            .key(key.to_string())
170            .send()
171            .await
172            .map_err(|e| Error::unhandled(e, "error sending s3 PUT request"))?;
173        Ok(())
174    }
175
176    async fn fetch_segment_index_inner(
177        &self,
178        config: &S3Config,
179        folder_key: &FolderKey<'_>,
180        segment_key: &SegmentKey,
181    ) -> Result<fst::Map<Arc<[u8]>>> {
182        let s3_index_key = s3_segment_index_key(folder_key, segment_key);
183        let mut stream = self
184            .s3_get(config, s3_index_key)
185            .await?
186            .body
187            .into_async_read();
188        let mut header: SegmentIndexHeader = SegmentIndexHeader::new_zeroed();
189        stream.read_exact(header.as_bytes_mut()).await?;
190        if header.magic.get() != LIBSQL_MAGIC && header.version.get() != 1 {
191            return Err(Error::InvalidIndex("index header magic or version invalid"));
192        }
193        let mut data = Vec::with_capacity(header.len.get() as _);
194        while stream.read_buf(&mut data).await? != 0 {}
195        let checksum = crc32fast::hash(&data);
196        if checksum != header.checksum.get() {
197            return Err(Error::InvalidIndex("invalid index data checksum"));
198        }
199        let index =
200            fst::Map::new(data.into()).map_err(|_| Error::InvalidIndex("invalid index bytes"))?;
201        Ok(index)
202    }
203
204    /// Find the most recent, and biggest segment that may contain `frame_no`
205    async fn find_segment_by_frame_no(
206        &self,
207        config: &S3Config,
208        folder_key: &FolderKey<'_>,
209        frame_no: u64,
210    ) -> Result<Option<SegmentKey>> {
211        let lookup_key_prefix = s3_segment_index_lookup_key_prefix(&folder_key);
212        let lookup_key = s3_segment_index_ends_before_lookup_key(&folder_key, frame_no);
213
214        let objects = self
215            .client
216            .list_objects_v2()
217            .bucket(&config.bucket)
218            .prefix(lookup_key_prefix.to_string())
219            .start_after(lookup_key.to_string())
220            .send()
221            .await
222            .map_err(|e| Error::unhandled(e, "failed to list bucket"))?;
223
224        let Some(contents) = objects.contents().first() else {
225            return Ok(None);
226        };
227        let key = contents.key().expect("misssing key?");
228        let key_path: &Path = key.as_ref();
229
230        let key = SegmentKey::validate_from_path(key_path, &folder_key.namespace);
231
232        Ok(key)
233    }
234
235    /// We are kinda bruteforcing out way into finding a segment that fits the bill, this can very
236    /// probably be optimized
237    #[tracing::instrument(skip(self, config, folder_key))]
238    async fn find_segment_by_timestamp(
239        &self,
240        config: &S3Config,
241        folder_key: &FolderKey<'_>,
242        timestamp: DateTime<Utc>,
243    ) -> Result<Option<SegmentKey>> {
244        let object_to_key = |o: &Object| {
245            let key_path = o.key().unwrap();
246            SegmentKey::validate_from_path(key_path.as_ref(), &folder_key.namespace)
247        };
248
249        let lookup_key_prefix = s3_segment_index_lookup_key_prefix(&folder_key);
250
251        let mut continuation_token = None;
252        loop {
253            let objects = self
254                .client
255                .list_objects_v2()
256                .set_continuation_token(continuation_token.take())
257                .bucket(&config.bucket)
258                .prefix(lookup_key_prefix.to_string())
259                .send()
260                .await
261                .map_err(|e| Error::unhandled(e, "failed to list bucket"))?;
262
263            // there is noting to restore
264            if objects.contents().is_empty() {
265                return Ok(None);
266            }
267
268            let ts = timestamp.timestamp_millis() as u64;
269            let search_result =
270                objects
271                    .contents()
272                    .binary_search_by_key(&std::cmp::Reverse(ts), |o| {
273                        let key = object_to_key(o).unwrap();
274                        std::cmp::Reverse(key.timestamp)
275                    });
276
277            match search_result {
278                Ok(i) => {
279                    let key = object_to_key(&objects.contents()[i]).unwrap();
280                    tracing::trace!("found perfect match for `{timestamp}`: {key}");
281                    return Ok(Some(key));
282                }
283                Err(i) if i == 0 => {
284                    // this is caught by the first iteration of the loop, anything that's more
285                    // recent than the most recent should be interpret as most recent
286                    let key = object_to_key(&objects.contents()[i]).unwrap();
287                    tracing::trace!("best match for `{timestamp}` is most recent segment: {key}");
288                    return Ok(Some(key));
289                }
290                Err(i) if i == objects.contents().len() => {
291                    // there are two scenarios. Either there are more pages with the request, and
292                    // we fetch older entries, or there aren't. If there are older segment, search
293                    // in those, otherwise, just take the oldest segment and return that
294                    if objects.continuation_token().is_some() {
295                        // nothing to do; fetch next page
296                    } else {
297                        let key = object_to_key(&objects.contents().last().unwrap()).unwrap();
298                        return Ok(Some(key));
299                    }
300                }
301                // This is the index where timestamp would be inserted, we look left and right of that
302                // key and pick the closest one.
303                Err(i) => {
304                    // i - 1 is well defined since we already catch the case where i == 0 above
305                    let left_key = object_to_key(&objects.contents()[i - 1]).unwrap();
306                    let right_key = object_to_key(&objects.contents()[i]).unwrap();
307                    let time_to_left = left_key.timestamp().signed_duration_since(timestamp).abs();
308                    let time_to_right =
309                        right_key.timestamp().signed_duration_since(timestamp).abs();
310
311                    if time_to_left < time_to_right {
312                        return Ok(Some(left_key));
313                    } else {
314                        return Ok(Some(right_key));
315                    }
316                }
317            }
318
319            match objects.continuation_token {
320                Some(token) => {
321                    continuation_token = Some(token);
322                }
323                None => {
324                    unreachable!("the absence of continuation token should be dealt with earlier");
325                }
326            }
327        }
328    }
329
330    // This method could probably be optimized a lot by using indexes and only downloading useful
331    // segments
332    async fn restore_latest(
333        &self,
334        config: &S3Config,
335        namespace: &NamespaceName,
336        dest: impl FileExt,
337    ) -> Result<()> {
338        let folder_key = FolderKey {
339            cluster_id: &config.cluster_id,
340            namespace,
341        };
342        let Some(latest_key) = self
343            .find_segment_by_frame_no(config, &folder_key, u64::MAX)
344            .await?
345        else {
346            tracing::info!("nothing to restore for {namespace}");
347            return Ok(());
348        };
349
350        let reader = self
351            .fetch_segment_data_reader(config, &folder_key, &latest_key)
352            .await?;
353        let mut reader = BufReader::new(reader);
354        let mut header: CompactedSegmentDataHeader = CompactedSegmentDataHeader::new_zeroed();
355        reader.read_exact(header.as_bytes_mut()).await?;
356        let db_size = header.size_after.get();
357        let mut seen = RoaringBitmap::new();
358        let mut frame: Frame = Frame::new_zeroed();
359        loop {
360            for _ in 0..header.frame_count.get() {
361                reader.read_exact(frame.as_bytes_mut()).await?;
362                let page_no = frame.header().page_no();
363                if !seen.contains(page_no) {
364                    seen.insert(page_no);
365                    let offset = (page_no as u64 - 1) * 4096;
366                    let buf = ZeroCopyBuf::new_init(frame).map_slice(|f| f.get_ref().data());
367                    let (buf, ret) = dest.write_all_at_async(buf, offset).await;
368                    ret?;
369                    frame = buf.into_inner().into_inner();
370                }
371            }
372
373            // db is restored
374            if seen.len() == db_size as u64 {
375                break;
376            }
377
378            let next_frame_no = header.start_frame_no.get() - 1;
379            let Some(key) = self
380                .find_segment_by_frame_no(config, &folder_key, next_frame_no)
381                .await?
382            else {
383                todo!("there should be a segment!");
384            };
385            let r = self
386                .fetch_segment_data_reader(config, &folder_key, &key)
387                .await?;
388            reader = BufReader::new(r);
389            reader.read_exact(header.as_bytes_mut()).await?;
390        }
391
392        Ok(())
393    }
394
395    async fn fetch_segment_from_key(
396        &self,
397        config: &S3Config,
398        folder_key: &FolderKey<'_>,
399        segment_key: &SegmentKey,
400        dest_file: &impl FileExt,
401    ) -> Result<fst::Map<Arc<[u8]>>> {
402        let (_, index) = tokio::try_join!(
403            self.fetch_segment_data_inner(config, &folder_key, &segment_key, dest_file),
404            self.fetch_segment_index_inner(config, &folder_key, &segment_key),
405        )?;
406
407        Ok(index)
408    }
409
410    fn list_segments_inner<'a>(
411        &'a self,
412        config: Arc<S3Config>,
413        namespace: &'a NamespaceName,
414        _until: u64,
415    ) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
416        async_stream::try_stream! {
417            let folder_key = FolderKey { cluster_id: &config.cluster_id, namespace };
418            let lookup_key_prefix = s3_segment_index_lookup_key_prefix(&folder_key);
419
420            let mut continuation_token = None;
421            loop {
422                let objects = self
423                    .client
424                    .list_objects_v2()
425                    .bucket(&config.bucket)
426                    .prefix(lookup_key_prefix.to_string())
427                    .set_continuation_token(continuation_token.take())
428                    .send()
429                    .await
430                    .map_err(|e| Error::unhandled(e, "failed to list bucket"))?;
431
432                for entry in objects.contents() {
433                    let key = entry.key().expect("misssing key?");
434                    let key_path: &Path = key.as_ref();
435                    let Some(key) = SegmentKey::validate_from_path(key_path, &folder_key.namespace) else { continue };
436
437                    let infos = SegmentInfo {
438                        key,
439                        size: entry.size().unwrap_or(0) as usize,
440                    };
441
442                    yield infos;
443                }
444
445                if objects.is_truncated().unwrap_or(false) {
446                    assert!(objects.next_continuation_token.is_some());
447                    continuation_token = objects.next_continuation_token;
448                } else {
449                    break
450                }
451            }
452        }
453    }
454}
455
456pub struct S3Config {
457    bucket: String,
458    aws_config: SdkConfig,
459    cluster_id: String,
460}
461
462struct FolderKey<'a> {
463    cluster_id: &'a str,
464    namespace: &'a NamespaceName,
465}
466
467impl fmt::Display for FolderKey<'_> {
468    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
469        write!(
470            f,
471            "v2/clusters/{}/namespaces/{}",
472            self.cluster_id, self.namespace
473        )
474    }
475}
476
477pub struct SegmentDataKey<'a>(&'a FolderKey<'a>, &'a SegmentKey);
478
479impl fmt::Display for SegmentDataKey<'_> {
480    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
481        write!(f, "{}/segments/{}", self.0, self.1)
482    }
483}
484
485fn s3_segment_data_key<'a>(
486    folder_key: &'a FolderKey,
487    segment_key: &'a SegmentKey,
488) -> SegmentDataKey<'a> {
489    SegmentDataKey(folder_key, segment_key)
490}
491
492pub struct SegmentIndexKey<'a>(&'a FolderKey<'a>, &'a SegmentKey);
493
494impl fmt::Display for SegmentIndexKey<'_> {
495    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
496        write!(f, "{}/indexes/{}", self.0, self.1)
497    }
498}
499
500fn s3_segment_index_key<'a>(
501    folder_key: &'a FolderKey,
502    segment_key: &'a SegmentKey,
503) -> SegmentIndexKey<'a> {
504    SegmentIndexKey(folder_key, segment_key)
505}
506
507pub struct SegmentIndexLookupKeyPrefix<'a>(&'a FolderKey<'a>);
508
509impl fmt::Display for SegmentIndexLookupKeyPrefix<'_> {
510    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
511        write!(f, "{}/indexes/", self.0)
512    }
513}
514
515fn s3_segment_index_lookup_key_prefix<'a>(
516    folder_key: &'a FolderKey,
517) -> SegmentIndexLookupKeyPrefix<'a> {
518    SegmentIndexLookupKeyPrefix(folder_key)
519}
520
521pub struct SegmentIndexLookupKey<'a>(&'a FolderKey<'a>, u64);
522
523impl fmt::Display for SegmentIndexLookupKey<'_> {
524    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
525        write!(f, "{}/indexes/{:020}", self.0, u64::MAX - self.1)
526    }
527}
528
529/// return the biggest segment whose end frame number is less than frame_no
530fn s3_segment_index_ends_before_lookup_key<'a>(
531    folder_key: &'a FolderKey,
532    frame_no: u64,
533) -> SegmentIndexLookupKey<'a> {
534    SegmentIndexLookupKey(folder_key, frame_no)
535}
536
537impl<IO> Backend for S3Backend<IO>
538where
539    IO: Io,
540{
541    type Config = Arc<S3Config>;
542
543    async fn store(
544        &self,
545        config: &Self::Config,
546        meta: SegmentMeta,
547        segment_data: impl FileExt,
548        segment_index: Vec<u8>,
549    ) -> Result<()> {
550        let folder_key = FolderKey {
551            cluster_id: &config.cluster_id,
552            namespace: &meta.namespace,
553        };
554        let segment_key = SegmentKey::from(&meta);
555        let s3_data_key = s3_segment_data_key(&folder_key, &segment_key);
556
557        let body = FileStreamBody::new(segment_data).into_byte_stream();
558
559        self.s3_put(config, s3_data_key, body).await?;
560
561        let s3_index_key = s3_segment_index_key(&folder_key, &segment_key);
562
563        let checksum = crc32fast::hash(&segment_index);
564        let header = SegmentIndexHeader {
565            version: 1.into(),
566            len: (segment_index.len() as u64).into(),
567            checksum: checksum.into(),
568            magic: LIBSQL_MAGIC.into(),
569        };
570
571        let mut bytes =
572            BytesMut::with_capacity(size_of::<SegmentIndexHeader>() + segment_index.len());
573        bytes.extend_from_slice(header.as_bytes());
574        bytes.extend_from_slice(&segment_index);
575
576        let body = ByteStream::from(bytes.freeze());
577
578        self.s3_put(config, s3_index_key, body).await?;
579
580        Ok(())
581    }
582
583    async fn meta(
584        &self,
585        config: &Self::Config,
586        namespace: &NamespaceName,
587    ) -> Result<super::DbMeta> {
588        let folder_key = FolderKey {
589            cluster_id: &config.cluster_id,
590            namespace: &namespace,
591        };
592
593        // request a key bigger than any other to get the last segment
594        let max_segment_key = self
595            .find_segment_by_frame_no(config, &folder_key, u64::MAX)
596            .await?;
597
598        Ok(super::DbMeta {
599            max_frame_no: max_segment_key.map(|s| s.end_frame_no).unwrap_or(0),
600        })
601    }
602
603    fn default_config(&self) -> Self::Config {
604        self.default_config.clone()
605    }
606
607    async fn restore(
608        &self,
609        config: &Self::Config,
610        namespace: &NamespaceName,
611        restore_options: RestoreOptions,
612        dest: impl FileExt,
613    ) -> Result<()> {
614        match restore_options {
615            RestoreOptions::Latest => self.restore_latest(config, &namespace, dest).await,
616            RestoreOptions::Timestamp(_) => todo!(),
617        }
618    }
619
620    async fn find_segment(
621        &self,
622        config: &Self::Config,
623        namespace: &NamespaceName,
624        req: FindSegmentReq,
625    ) -> Result<SegmentKey> {
626        let folder_key = FolderKey {
627            cluster_id: &config.cluster_id,
628            namespace: &namespace,
629        };
630
631        match req {
632            FindSegmentReq::EndFrameNoLessThan(frame_no) => self
633                .find_segment_by_frame_no(config, &folder_key, frame_no)
634                .await?
635                .ok_or_else(|| Error::SegmentNotFound(req)),
636            FindSegmentReq::Timestamp(ts) => self
637                .find_segment_by_timestamp(config, &folder_key, ts)
638                .await?
639                .ok_or_else(|| Error::SegmentNotFound(req)),
640        }
641    }
642
643    async fn fetch_segment_index(
644        &self,
645        config: &Self::Config,
646        namespace: &NamespaceName,
647        key: &SegmentKey,
648    ) -> Result<fst::Map<Arc<[u8]>>> {
649        let folder_key = FolderKey {
650            cluster_id: &config.cluster_id,
651            namespace: &namespace,
652        };
653        self.fetch_segment_index_inner(config, &folder_key, key)
654            .await
655    }
656
657    async fn fetch_segment_data_to_file(
658        &self,
659        config: &Self::Config,
660        namespace: &NamespaceName,
661        key: &SegmentKey,
662        file: &impl FileExt,
663    ) -> Result<CompactedSegmentDataHeader> {
664        let folder_key = FolderKey {
665            cluster_id: &config.cluster_id,
666            namespace: &namespace,
667        };
668        let header = self
669            .fetch_segment_data_inner(config, &folder_key, key, file)
670            .await?;
671        Ok(header)
672    }
673
674    async fn fetch_segment_data(
675        self: Arc<Self>,
676        config: Self::Config,
677        namespace: NamespaceName,
678        key: SegmentKey,
679    ) -> Result<impl FileExt> {
680        let file = self.io.tempfile()?;
681        self.fetch_segment_data_to_file(&config, &namespace, &key, &file)
682            .await?;
683        Ok(file)
684    }
685
686    fn list_segments<'a>(
687        &'a self,
688        config: Self::Config,
689        namespace: &'a NamespaceName,
690        until: u64,
691    ) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
692        self.list_segments_inner(config, namespace, until)
693    }
694}
695
696#[derive(Clone, Copy)]
697enum StreamState {
698    Init,
699    WaitingChunk,
700    Done,
701}
702
703struct FileStreamBody<F> {
704    inner: Arc<F>,
705    current_offset: u64,
706    chunk_size: usize,
707    state: StreamState,
708    fut: ReusableBoxFuture<'static, std::io::Result<Bytes>>,
709}
710
711impl<F> FileStreamBody<F> {
712    fn new(inner: F) -> Self {
713        Self::new_inner(inner.into())
714    }
715
716    fn new_inner(inner: Arc<F>) -> Self {
717        Self {
718            inner,
719            current_offset: 0,
720            chunk_size: 4096,
721            state: StreamState::Init,
722            fut: ReusableBoxFuture::new(std::future::pending()),
723        }
724    }
725
726    fn into_byte_stream(self) -> ByteStream
727    where
728        F: FileExt,
729    {
730        let body = SdkBody::retryable(move || {
731            let s = Self::new_inner(self.inner.clone());
732            SdkBody::from_body_1_x(s)
733        });
734
735        ByteStream::new(body)
736    }
737}
738
739impl<F> http_body::Body for FileStreamBody<F>
740where
741    F: FileExt,
742{
743    type Data = Bytes;
744    type Error = std::io::Error;
745
746    fn poll_frame(
747        mut self: Pin<&mut Self>,
748        cx: &mut std::task::Context<'_>,
749    ) -> Poll<Option<Result<HttpFrame<Self::Data>, Self::Error>>> {
750        loop {
751            match self.state {
752                StreamState::Init => {
753                    let f = self.inner.clone();
754                    let chunk_size = self.chunk_size;
755                    let current_offset = self.current_offset;
756                    let fut = async move {
757                        let buf = BytesMut::with_capacity(chunk_size);
758                        let (buf, ret) = f.read_at_async(buf, current_offset).await;
759                        ret.map(|_| buf.freeze())
760                    };
761                    self.fut.set(fut);
762                    self.state = StreamState::WaitingChunk;
763                }
764                StreamState::WaitingChunk => match self.fut.poll(cx) {
765                    Poll::Ready(Ok(buf)) => {
766                        // TODO: we perform one too many read,
767                        if buf.is_empty() {
768                            self.state = StreamState::Done;
769                            return Poll::Ready(None);
770                        } else {
771                            self.state = StreamState::Init;
772                            self.current_offset += buf.len() as u64;
773                            return Poll::Ready(Some(Ok(HttpFrame::data(buf))));
774                        }
775                    }
776                    Poll::Ready(Err(e)) => {
777                        self.state = StreamState::Done;
778                        return Poll::Ready(Some(Err(e)));
779                    }
780                    Poll::Pending => return Poll::Pending,
781                },
782                StreamState::Done => return Poll::Ready(None),
783            }
784        }
785    }
786
787    fn size_hint(&self) -> SizeHint {
788        match self.inner.len() {
789            Ok(n) => SizeHint::with_exact(n),
790            Err(_) => SizeHint::new(),
791        }
792    }
793}
794
795#[cfg(test)]
796mod tests {
797    use std::path::Path;
798
799    use aws_config::{BehaviorVersion, Region, SdkConfig};
800    use aws_sdk_s3::config::SharedCredentialsProvider;
801    use chrono::Utc;
802    use fst::MapBuilder;
803    use s3s::auth::SimpleAuth;
804    use s3s::service::{S3ServiceBuilder, SharedS3Service};
805    use uuid::Uuid;
806
807    use crate::io::StdIO;
808
809    use super::*;
810
811    #[track_caller]
812    fn setup(dir: impl AsRef<Path>) -> (SdkConfig, SharedS3Service) {
813        std::fs::create_dir_all(&dir).unwrap();
814        let s3_impl = s3s_fs::FileSystem::new(dir).unwrap();
815
816        let cred = aws_credential_types::Credentials::for_tests();
817
818        let mut s3 = S3ServiceBuilder::new(s3_impl);
819        s3.set_auth(SimpleAuth::from_single(
820            cred.access_key_id(),
821            cred.secret_access_key(),
822        ));
823        s3.set_base_domain("localhost:8014");
824        let s3 = s3.build().into_shared();
825
826        let client = s3s_aws::Client::from(s3.clone());
827
828        let config = aws_config::SdkConfig::builder()
829            .http_client(client)
830            .behavior_version(BehaviorVersion::latest())
831            .region(Region::from_static("us-west-2"))
832            .credentials_provider(SharedCredentialsProvider::new(cred))
833            .endpoint_url("http://localhost:8014")
834            .build();
835
836        (config, s3)
837    }
838
839    #[tokio::test]
840    async fn s3_basic() {
841        let _ = tracing_subscriber::fmt::try_init();
842        let dir = tempfile::tempdir().unwrap();
843        let (aws_config, _s3) = setup(&dir);
844
845        let s3_config = Arc::new(S3Config {
846            bucket: "testbucket".into(),
847            aws_config: aws_config.clone(),
848            cluster_id: "123456789".into(),
849        });
850
851        let storage = S3Backend::from_sdk_config_with_io(
852            aws_config,
853            "testbucket".into(),
854            "123456789".into(),
855            StdIO(()),
856        )
857        .await
858        .unwrap();
859
860        let f_path = dir.path().join("fs-segments");
861        std::fs::write(&f_path, vec![123; 8092]).unwrap();
862
863        let ns = NamespaceName::from_string("foobarbaz".into());
864
865        let mut builder = MapBuilder::memory();
866        builder.insert(42u32.to_be_bytes(), 42).unwrap();
867        let index = builder.into_inner().unwrap();
868        storage
869            .store(
870                &s3_config,
871                SegmentMeta {
872                    namespace: ns.clone(),
873                    segment_id: Uuid::new_v4(),
874                    start_frame_no: 1u64.into(),
875                    end_frame_no: 64u64.into(),
876                    segment_timestamp: Utc::now(),
877                },
878                std::fs::File::open(&f_path).unwrap(),
879                index,
880            )
881            .await
882            .unwrap();
883
884        let db_meta = storage.meta(&s3_config, &ns).await.unwrap();
885        assert_eq!(db_meta.max_frame_no, 64);
886
887        let mut builder = MapBuilder::memory();
888        builder.insert(44u32.to_be_bytes(), 44).unwrap();
889        let index = builder.into_inner().unwrap();
890        storage
891            .store(
892                &s3_config,
893                SegmentMeta {
894                    namespace: ns.clone(),
895                    segment_id: Uuid::new_v4(),
896                    start_frame_no: 64u64.into(),
897                    end_frame_no: 128u64.into(),
898                    segment_timestamp: Utc::now(),
899                },
900                std::fs::File::open(&f_path).unwrap(),
901                index,
902            )
903            .await
904            .unwrap();
905
906        let db_meta = storage.meta(&s3_config, &ns).await.unwrap();
907        assert_eq!(db_meta.max_frame_no, 128);
908
909        let key = storage
910            .find_segment(&s3_config, &ns, FindSegmentReq::EndFrameNoLessThan(65))
911            .await
912            .unwrap();
913        assert_eq!(key.start_frame_no, 1);
914        assert_eq!(key.end_frame_no, 64);
915
916        let index = storage
917            .fetch_segment_index(&s3_config, &ns, &key)
918            .await
919            .unwrap();
920        assert_eq!(index.get(42u32.to_be_bytes()).unwrap(), 42);
921    }
922}