1use std::{io::Cursor, path::PathBuf};
2
3use tokio::{
4 fs::{File, metadata},
5 io::{AsyncReadExt, BufReader},
6};
7
8use crate::{
9 api::{
10 Authentication, TwapiOptions,
11 get_2_media_upload::{self, Command},
12 post_2_media_upload_id_append, post_2_media_upload_id_finalize,
13 post_2_media_upload_initialize::{self, MediaCategory},
14 },
15 error::Error,
16 headers::Headers,
17 responses::processing_info::{ProcessingInfo, State},
18};
19
20async fn get_file_size(path: &PathBuf) -> Result<u64, Error> {
21 let metadata = metadata(path).await?;
22 Ok(metadata.len())
23}
24
25pub async fn upload_media(
26 path: &PathBuf,
27 media_type: &str,
28 media_category: Option<MediaCategory>,
29 additional_owners: Vec<String>,
30 authentication: &impl Authentication,
31 twapi_options: Option<&TwapiOptions>,
32) -> Result<(post_2_media_upload_id_finalize::Response, Headers), Error> {
33 let file_size = get_file_size(path).await?;
35 let media_id = execute_init(
36 file_size,
37 media_type,
38 media_category,
39 additional_owners,
40 authentication,
41 twapi_options,
42 )
43 .await?;
44 tracing::info!(media_id = media_id, "post_media_upload_init");
45
46 execute_append(path, authentication, file_size, &media_id, twapi_options).await?;
48
49 let mut api = post_2_media_upload_id_finalize::Api::new(&media_id);
51 if let Some(twapi_options) = twapi_options {
52 api = api.twapi_options(twapi_options.clone());
53 }
54 let res = api.execute(authentication).await;
55 tracing::info!(media_id = media_id, "post_media_upload_finalize");
56 res
57}
58
59pub async fn upload_media_from_bytes(
60 data: &[u8],
61 media_type: &str,
62 media_category: Option<MediaCategory>,
63 additional_owners: Vec<String>,
64 authentication: &impl Authentication,
65 twapi_options: Option<&TwapiOptions>,
66) -> Result<(post_2_media_upload_id_finalize::Response, Headers), Error> {
67 let file_size = data.len() as u64;
69 let media_id = execute_init(
70 file_size,
71 media_type,
72 media_category,
73 additional_owners,
74 authentication,
75 twapi_options,
76 )
77 .await?;
78 tracing::info!(media_id = media_id, "post_media_upload_init");
79
80 execute_append_from_bytes(data, authentication, file_size, &media_id, twapi_options).await?;
82
83 let mut api = post_2_media_upload_id_finalize::Api::new(&media_id);
85 if let Some(twapi_options) = twapi_options {
86 api = api.twapi_options(twapi_options.clone());
87 }
88 let res = api.execute(authentication).await;
89 tracing::info!(media_id = media_id, "post_media_upload_finalize");
90 res
91}
92
93async fn execute_init(
94 file_size: u64,
95 media_type: &str,
96 media_category: Option<MediaCategory>,
97 additional_owners: Vec<String>,
98 authentication: &impl Authentication,
99 twapi_options: Option<&TwapiOptions>,
100) -> Result<String, Error> {
101 let body = post_2_media_upload_initialize::Body {
102 total_bytes: file_size,
103 media_type: media_type.to_owned(),
104 media_category,
105 additional_owners,
106 };
107 let mut api = post_2_media_upload_initialize::Api::new(body);
108 if let Some(twapi_options) = twapi_options {
109 api = api.twapi_options(twapi_options.clone());
110 }
111 let (response, _) = api.execute(authentication).await?;
112 let media_id = response.data.and_then(|it| it.id).unwrap_or_default();
113 Ok(media_id)
114}
115
116pub fn get_media_id(response: &post_2_media_upload_id_finalize::Response) -> String {
117 let Some(data) = &response.data else {
118 return "".to_owned();
119 };
120 data.id.clone().unwrap_or_default()
121}
122
123async fn execute_append(
124 path: &PathBuf,
125 authentication: &impl Authentication,
126 file_size: u64,
127 media_id: &str,
128 twapi_options: Option<&TwapiOptions>,
129) -> Result<(), Error> {
130 let mut segment_index = 0;
131 let f = File::open(path).await?;
132 let mut reader = BufReader::new(f);
133 while segment_index * 5000000 < file_size {
134 let read_size: usize = if (segment_index + 1) * 5000000 < file_size {
135 5000000
136 } else {
137 (file_size - segment_index * 5000000) as usize
138 };
139 let mut cursor = Cursor::new(vec![0; read_size]);
140 reader.read_exact(cursor.get_mut()).await?;
141 let form = post_2_media_upload_id_append::FormData {
142 segment_index,
143 cursor,
144 };
145 let mut api = post_2_media_upload_id_append::Api::new(media_id, form);
146 if let Some(twapi_options) = twapi_options {
147 api = api.twapi_options(twapi_options.clone());
148 }
149 let _ = api.execute(authentication).await?;
150 tracing::info!(
151 segment_index = segment_index,
152 media_id = media_id,
153 "post_media_upload_append"
154 );
155 segment_index += 1;
156 }
157 Ok(())
158}
159
160async fn execute_append_from_bytes(
161 data: &[u8],
162 authentication: &impl Authentication,
163 file_size: u64,
164 media_id: &str,
165 twapi_options: Option<&TwapiOptions>,
166) -> Result<(), Error> {
167 let mut segment_index = 0;
168 while segment_index * 5000000 < file_size {
169 let start_pos = segment_index as usize * 5000000;
170 let remaining_bytes = file_size as usize - start_pos;
171 let read_size = std::cmp::min(5000000, remaining_bytes);
172
173 let data_slice = &data[start_pos..start_pos + read_size];
174 let cursor = Cursor::new(data_slice.to_owned());
175
176 let form = post_2_media_upload_id_append::FormData {
177 segment_index,
178 cursor,
179 };
180 let mut api = post_2_media_upload_id_append::Api::new(media_id, form);
181 if let Some(twapi_options) = twapi_options {
182 api = api.twapi_options(twapi_options.clone());
183 }
184 let _ = api.execute(authentication).await?;
185 tracing::info!(
186 segment_index = segment_index,
187 media_id = media_id,
188 "post_media_upload_append"
189 );
190 segment_index += 1;
191 }
192 Ok(())
193}
194
195fn get_check_after_secs(processing_info: &Option<ProcessingInfo>) -> Option<i64> {
196 let Some(processing_info) = processing_info else {
197 return None;
198 };
199 let state = &(processing_info.state.clone()?);
200 match state {
201 State::Pending | State::InProgress => processing_info.check_after_secs,
202 _ => None,
203 }
204}
205
206fn calc_progress_percent(res: &get_2_media_upload::Response) -> i64 {
207 let Some(data) = &res.data else {
208 return 0;
209 };
210 let Some(ref processing_info) = data.processing_info else {
211 return 0;
212 };
213 processing_info.progress_percent.unwrap_or(0)
214}
215
216pub async fn check_processing(
217 response: post_2_media_upload_id_finalize::Response,
218 authentication: &impl Authentication,
219 f: Option<impl Fn(i64, &get_2_media_upload::Response, &Headers) -> Result<(), Error>>,
220 twapi_options: Option<&TwapiOptions>,
221) -> Result<(), Error> {
222 let Some(data) = response.data else {
223 return Err(Error::Other("data not found".to_owned(), None));
224 };
225 let Some(media_id) = data.id else {
226 return Err(Error::Other("media_id not found".to_owned(), None));
227 };
228 let mut processing_info = data.processing_info;
229 let mut count = 0;
230 loop {
231 let Some(check_after_secs) = get_check_after_secs(&processing_info) else {
232 return Ok(());
233 };
234 tokio::time::sleep(std::time::Duration::from_secs(check_after_secs as u64)).await;
235 let mut api = get_2_media_upload::Api::new(&media_id, Command::Status);
236 if let Some(twapi_options) = twapi_options {
237 api = api.twapi_options(twapi_options.clone());
238 }
239 let (res, header) = api.execute(authentication).await?;
240 tracing::info!(
241 count = count,
242 media_id = media_id,
243 progress_percent = calc_progress_percent(&res),
244 "get_media_upload"
245 );
246 if let Some(ref f) = f {
247 f(count, &res, &header)?;
248 }
249 processing_info = res.data.and_then(|it| it.processing_info);
250 count += 1;
251 }
252}