sea_streamer_file/
dyn_file.rs1use std::{future::Future, pin::Pin};
2
3use crate::{
4 AsyncFile, ByteSource, Bytes, FileErr, FileId, FileReader, FileReaderFuture, FileSource,
5 FileSourceFuture, ReadFrom,
6};
7use sea_streamer_types::{export::futures::FutureExt, SeqPos};
8
9pub enum DynFileSource {
12 FileReader(FileReader),
13 FileSource(FileSource),
14 Dead,
16}
17
18pub enum FileSourceType {
19 FileReader,
20 FileSource,
21}
22
23pub enum DynReadFuture<'a> {
24 FileReader(FileReaderFuture<'a>),
25 FileSource(FileSourceFuture<'a>),
26}
27
28impl DynFileSource {
29 pub async fn new(file_id: FileId, stype: FileSourceType) -> Result<Self, FileErr> {
30 match stype {
31 FileSourceType::FileReader => Ok(Self::FileReader(FileReader::new(file_id).await?)),
32 FileSourceType::FileSource => Ok(Self::FileSource(
33 FileSource::new(file_id, ReadFrom::Beginning).await?,
34 )),
35 }
36 }
37
38 pub async fn seek(&mut self, to: SeqPos) -> Result<u64, FileErr> {
39 match self {
40 Self::FileReader(file) => file.seek(to).await,
41 Self::FileSource(file) => file.seek(to).await,
42 Self::Dead => panic!("DynFileSource: Dead"),
43 }
44 }
45
46 pub fn source_type(&self) -> FileSourceType {
47 match self {
48 Self::FileReader(_) => FileSourceType::FileReader,
49 Self::FileSource(_) => FileSourceType::FileSource,
50 Self::Dead => panic!("DynFileSource: Dead"),
51 }
52 }
53
54 pub async fn switch_to(self, stype: FileSourceType) -> Result<Self, FileErr> {
58 match (self, stype) {
59 (Self::Dead, _) => panic!("DynFileSource: Dead"),
60 (Self::FileReader(file), FileSourceType::FileSource) => {
61 let (file, offset, buffer) = file.end();
62 Ok(Self::FileSource(FileSource::new_with(
63 file, offset, buffer,
64 )?))
65 }
66 (Self::FileSource(mut src), FileSourceType::FileReader) => {
67 let (file, _, _, buffer) = src.end().await;
68 Ok(Self::FileReader(FileReader::new_with(
69 file,
70 src.offset(),
71 buffer,
72 )?))
73 }
74 (myself, _) => Ok(myself),
75 }
76 }
77
78 pub async fn end(self) -> AsyncFile {
79 match self {
80 Self::Dead => panic!("DynFileSource: Dead"),
81 Self::FileReader(file) => {
82 let (file, _, _) = file.end();
83 file
84 }
85 Self::FileSource(mut src) => {
86 let (file, _, _, _) = src.end().await;
87 file
88 }
89 }
90 }
91
92 #[inline]
93 pub fn offset(&self) -> u64 {
94 match self {
95 Self::FileReader(file) => file.offset(),
96 Self::FileSource(file) => file.offset(),
97 Self::Dead => panic!("DynFileSource: Dead"),
98 }
99 }
100
101 #[inline]
102 pub fn file_size(&self) -> u64 {
103 match self {
104 Self::FileReader(file) => file.file_size(),
105 Self::FileSource(file) => file.file_size(),
106 Self::Dead => panic!("DynFileSource: Dead"),
107 }
108 }
109
110 #[inline]
111 pub(crate) async fn resize(&mut self) -> Result<u64, FileErr> {
112 match self {
113 Self::FileReader(file) => file.resize().await,
114 Self::FileSource(_) => panic!("DynFileSource: FileSource cannot be resized"),
115 Self::Dead => panic!("DynFileSource: Dead"),
116 }
117 }
118
119 pub fn is_dead(&self) -> bool {
120 matches!(self, Self::Dead)
121 }
122}
123
124impl ByteSource for DynFileSource {
125 type Future<'a> = DynReadFuture<'a>;
126
127 fn request_bytes(&mut self, size: usize) -> Self::Future<'_> {
128 match self {
129 Self::FileReader(file) => DynReadFuture::FileReader(file.request_bytes(size)),
130 Self::FileSource(file) => DynReadFuture::FileSource(file.request_bytes(size)),
131 Self::Dead => panic!("DynFileSource: Dead"),
132 }
133 }
134}
135
136impl<'a> Future for DynReadFuture<'a> {
137 type Output = Result<Bytes, FileErr>;
138
139 fn poll(
140 self: std::pin::Pin<&mut Self>,
141 cx: &mut std::task::Context<'_>,
142 ) -> std::task::Poll<Self::Output> {
143 use std::task::Poll::{Pending, Ready};
144
145 match Pin::into_inner(self) {
146 Self::FileReader(fut) => match Pin::new(fut).poll_unpin(cx) {
147 Ready(res) => Ready(res),
148 Pending => Pending,
149 },
150 Self::FileSource(fut) => match Pin::new(fut).poll_unpin(cx) {
151 Ready(res) => Ready(res),
152 Pending => Pending,
153 },
154 }
155 }
156}