sea_streamer_file/
dyn_file.rs

1use std::{future::Future, pin::Pin};
2
3use crate::{
4    AsyncFile, ByteSource, Bytes, FileErr, FileId, FileReader, FileReaderFuture, FileSource,
5    FileSourceFuture, ReadFrom,
6};
7use sea_streamer_types::{export::futures::FutureExt, SeqPos};
8
9/// A runtime adapter of `FileReader` and `FileSource`,
10/// also able to switch between the two mode of operations dynamically.
11pub enum DynFileSource {
12    FileReader(FileReader),
13    FileSource(FileSource),
14    /// If you encounter this, it's a programming mistake
15    Dead,
16}
17
18pub enum FileSourceType {
19    FileReader,
20    FileSource,
21}
22
23pub enum DynReadFuture<'a> {
24    FileReader(FileReaderFuture<'a>),
25    FileSource(FileSourceFuture<'a>),
26}
27
28impl DynFileSource {
29    pub async fn new(file_id: FileId, stype: FileSourceType) -> Result<Self, FileErr> {
30        match stype {
31            FileSourceType::FileReader => Ok(Self::FileReader(FileReader::new(file_id).await?)),
32            FileSourceType::FileSource => Ok(Self::FileSource(
33                FileSource::new(file_id, ReadFrom::Beginning).await?,
34            )),
35        }
36    }
37
38    pub async fn seek(&mut self, to: SeqPos) -> Result<u64, FileErr> {
39        match self {
40            Self::FileReader(file) => file.seek(to).await,
41            Self::FileSource(file) => file.seek(to).await,
42            Self::Dead => panic!("DynFileSource: Dead"),
43        }
44    }
45
46    pub fn source_type(&self) -> FileSourceType {
47        match self {
48            Self::FileReader(_) => FileSourceType::FileReader,
49            Self::FileSource(_) => FileSourceType::FileSource,
50            Self::Dead => panic!("DynFileSource: Dead"),
51        }
52    }
53
54    /// Switch to a different mode of operation.
55    ///
56    /// Warning: This future must not be canceled.
57    pub async fn switch_to(self, stype: FileSourceType) -> Result<Self, FileErr> {
58        match (self, stype) {
59            (Self::Dead, _) => panic!("DynFileSource: Dead"),
60            (Self::FileReader(file), FileSourceType::FileSource) => {
61                let (file, offset, buffer) = file.end();
62                Ok(Self::FileSource(FileSource::new_with(
63                    file, offset, buffer,
64                )?))
65            }
66            (Self::FileSource(mut src), FileSourceType::FileReader) => {
67                let (file, _, _, buffer) = src.end().await;
68                Ok(Self::FileReader(FileReader::new_with(
69                    file,
70                    src.offset(),
71                    buffer,
72                )?))
73            }
74            (myself, _) => Ok(myself),
75        }
76    }
77
78    pub async fn end(self) -> AsyncFile {
79        match self {
80            Self::Dead => panic!("DynFileSource: Dead"),
81            Self::FileReader(file) => {
82                let (file, _, _) = file.end();
83                file
84            }
85            Self::FileSource(mut src) => {
86                let (file, _, _, _) = src.end().await;
87                file
88            }
89        }
90    }
91
92    #[inline]
93    pub fn offset(&self) -> u64 {
94        match self {
95            Self::FileReader(file) => file.offset(),
96            Self::FileSource(file) => file.offset(),
97            Self::Dead => panic!("DynFileSource: Dead"),
98        }
99    }
100
101    #[inline]
102    pub fn file_size(&self) -> u64 {
103        match self {
104            Self::FileReader(file) => file.file_size(),
105            Self::FileSource(file) => file.file_size(),
106            Self::Dead => panic!("DynFileSource: Dead"),
107        }
108    }
109
110    #[inline]
111    pub(crate) async fn resize(&mut self) -> Result<u64, FileErr> {
112        match self {
113            Self::FileReader(file) => file.resize().await,
114            Self::FileSource(_) => panic!("DynFileSource: FileSource cannot be resized"),
115            Self::Dead => panic!("DynFileSource: Dead"),
116        }
117    }
118
119    pub fn is_dead(&self) -> bool {
120        matches!(self, Self::Dead)
121    }
122}
123
124impl ByteSource for DynFileSource {
125    type Future<'a> = DynReadFuture<'a>;
126
127    fn request_bytes(&mut self, size: usize) -> Self::Future<'_> {
128        match self {
129            Self::FileReader(file) => DynReadFuture::FileReader(file.request_bytes(size)),
130            Self::FileSource(file) => DynReadFuture::FileSource(file.request_bytes(size)),
131            Self::Dead => panic!("DynFileSource: Dead"),
132        }
133    }
134}
135
136impl<'a> Future for DynReadFuture<'a> {
137    type Output = Result<Bytes, FileErr>;
138
139    fn poll(
140        self: std::pin::Pin<&mut Self>,
141        cx: &mut std::task::Context<'_>,
142    ) -> std::task::Poll<Self::Output> {
143        use std::task::Poll::{Pending, Ready};
144
145        match Pin::into_inner(self) {
146            Self::FileReader(fut) => match Pin::new(fut).poll_unpin(cx) {
147                Ready(res) => Ready(res),
148                Pending => Pending,
149            },
150            Self::FileSource(fut) => match Pin::new(fut).poll_unpin(cx) {
151                Ready(res) => Ready(res),
152                Pending => Pending,
153            },
154        }
155    }
156}