tg_flows/types/
input_file.rs1use bytes::{Bytes, BytesMut};
2use once_cell::sync::OnceCell;
3use rc_box::ArcBox;
4use serde::Serialize;
5use takecell::TakeCell;
6use tokio::{
7 io::{AsyncRead, ReadBuf},
8 sync::watch,
9};
10use tokio_util::codec::Decoder;
11
12use std::{borrow::Cow, fmt, io, mem, path::PathBuf, pin::Pin, sync::Arc, task};
13
14use crate::types::InputSticker;
15
16#[derive(Debug, Clone)]
20pub struct InputFile {
21 id: OnceCell<Arc<str>>,
22 file_name: Option<Cow<'static, str>>,
23 inner: InnerFile,
24}
25
26#[derive(Clone)]
27enum InnerFile {
28 Read(Read),
29 File(PathBuf),
30 Bytes(bytes::Bytes),
31 Url(url::Url),
32 FileId(String),
33}
34
35use InnerFile::*;
36
37impl InputFile {
38 #[must_use]
53 pub fn url(url: url::Url) -> Self {
54 Self::new(Url(url))
55 }
56
57 pub fn file_id(file_id: impl Into<String>) -> Self {
74 Self::new(FileId(file_id.into()))
75 }
76
77 pub fn file(path: impl Into<PathBuf>) -> Self {
79 Self::new(File(path.into()))
80 }
81
82 pub fn memory(data: impl Into<bytes::Bytes>) -> Self {
84 Self::new(Bytes(data.into()))
85 }
86
87 pub fn file_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
89 self.file_name = Some(name.into());
90 self
91 }
92
93 pub fn read(it: impl AsyncRead + Send + Unpin + 'static) -> Self {
98 Self::new(Read(Read::new(Arc::new(TakeCell::new(it)))))
99 }
100
101 fn new(inner: InnerFile) -> Self {
104 Self {
105 file_name: None,
106 inner,
107 id: OnceCell::new(),
108 }
109 }
110
111 pub(crate) fn id(&self) -> &str {
115 let random = || Arc::from(&*uuid::Uuid::new_v4().as_simple().encode_lower(&mut [0; 32]));
116 self.id.get_or_init(random)
117 }
118
119 pub(crate) fn needs_attach(&self) -> bool {
122 !matches!(self.inner, Url(_) | FileId(_))
123 }
124
125 pub(crate) fn take(&mut self) -> Self {
130 mem::replace(self, InputFile::file_id(String::new()))
131 }
132
133 fn attach_or_value(&self) -> String {
138 match &self.inner {
139 Url(url) => url.as_str().to_owned(),
140 FileId(file_id) => file_id.clone(),
141 _ => {
142 const PREFIX: &str = "attach://";
143
144 let id = self.id();
145 let mut s = String::with_capacity(PREFIX.len() + id.len());
146 s += PREFIX;
147 s += id;
148
149 s
150 }
151 }
152 }
153
154 fn _take_or_guess_filename(&mut self) -> Cow<'static, str> {
157 self.file_name.take().unwrap_or_else(|| match &self.inner {
158 File(path_to_file) => match path_to_file.file_name() {
159 Some(name) => Cow::Owned(name.to_string_lossy().into_owned()),
160 None => Cow::Borrowed(""),
161 },
162 _ => Cow::Borrowed(""),
163 })
164 }
165}
166
167impl fmt::Debug for InnerFile {
168 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169 match self {
170 Read(_) => f.debug_struct("Read").finish_non_exhaustive(),
171 File(path) => f.debug_struct("File").field("path", path).finish(),
172 Bytes(bytes) if f.alternate() => f.debug_tuple("Memory").field(bytes).finish(),
173 Bytes(_) => f.debug_struct("Memory").finish_non_exhaustive(),
174 Url(url) => f.debug_tuple("Url").field(url).finish(),
175 FileId(file_id) => f.debug_tuple("FileId").field(file_id).finish(),
176 }
177 }
178}
179
180impl Serialize for InputFile {
181 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
182 where
183 S: serde::Serializer,
184 {
185 self.attach_or_value().serialize(serializer)
186 }
187}
188
189#[derive(Clone)]
192struct Read {
193 _inner: Arc<TakeCell<dyn AsyncRead + Send + Unpin>>,
194 _buf: Arc<OnceCell<Result<Vec<Bytes>, Arc<io::Error>>>>,
195 _notify: Arc<watch::Sender<()>>,
196 _wait: watch::Receiver<()>,
197}
198
199impl Read {
200 fn new(it: Arc<TakeCell<dyn AsyncRead + Send + Unpin>>) -> Self {
201 let (tx, rx) = watch::channel(());
202
203 Self {
204 _inner: it,
205 _buf: Arc::default(),
206 _notify: Arc::new(tx),
207 _wait: rx,
208 }
209 }
210}
211
212struct ExclusiveArcAsyncRead(ArcBox<TakeCell<dyn AsyncRead + Send + Unpin>>);
214
215impl AsyncRead for ExclusiveArcAsyncRead {
216 fn poll_read(
217 self: Pin<&mut Self>,
218 cx: &mut task::Context<'_>,
219 buf: &mut ReadBuf<'_>,
220 ) -> task::Poll<io::Result<()>> {
221 let Self(inner) = Pin::get_mut(self);
222 let read: &mut (dyn AsyncRead + Unpin) = inner.get();
223 Pin::new(read).poll_read(cx, buf)
224 }
225}
226
227struct BytesDecoder;
228
229impl Decoder for BytesDecoder {
230 type Item = Bytes;
231 type Error = io::Error;
232
233 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
234 if src.is_empty() {
235 return Ok(None);
236 }
237 Ok(Some(src.split().freeze()))
238 }
239}
240
241pub(crate) trait InputFileLike {
245 fn copy_into(&self, into: &mut dyn FnMut(InputFile));
246
247 fn move_into(&mut self, into: &mut dyn FnMut(InputFile));
248}
249
250impl InputFileLike for InputFile {
251 fn copy_into(&self, into: &mut dyn FnMut(InputFile)) {
252 into(self.clone())
253 }
254
255 fn move_into(&mut self, into: &mut dyn FnMut(InputFile)) {
256 into(self.take())
257 }
258}
259
260impl InputFileLike for Option<InputFile> {
261 fn copy_into(&self, into: &mut dyn FnMut(InputFile)) {
262 if let Some(this) = self {
263 this.copy_into(into)
264 }
265 }
266
267 fn move_into(&mut self, into: &mut dyn FnMut(InputFile)) {
268 if let Some(this) = self {
269 this.move_into(into)
270 }
271 }
272}
273
274impl InputFileLike for InputSticker {
275 fn copy_into(&self, into: &mut dyn FnMut(InputFile)) {
276 let (Self::Png(input_file) | Self::Tgs(input_file) | Self::Webm(input_file)) = self;
277
278 input_file.copy_into(into)
279 }
280
281 fn move_into(&mut self, into: &mut dyn FnMut(InputFile)) {
282 let (Self::Png(input_file) | Self::Tgs(input_file) | Self::Webm(input_file)) = self;
283
284 input_file.move_into(into)
285 }
286}