Skip to main content

twapi_v2/
upload_v2.rs

1use std::{io::Cursor, path::PathBuf};
2
3use tokio::{
4    fs::{File, metadata},
5    io::{AsyncReadExt, BufReader},
6};
7
8use crate::{
9    api::{
10        Authentication, TwapiOptions,
11        get_2_media_upload::{self, Command},
12        post_2_media_upload_id_append, post_2_media_upload_id_finalize,
13        post_2_media_upload_initialize::{self, MediaCategory},
14    },
15    error::Error,
16    headers::Headers,
17    responses::processing_info::{ProcessingInfo, State},
18};
19
20async fn get_file_size(path: &PathBuf) -> Result<u64, Error> {
21    let metadata = metadata(path).await?;
22    Ok(metadata.len())
23}
24
25pub async fn upload_media(
26    path: &PathBuf,
27    media_type: &str,
28    media_category: Option<MediaCategory>,
29    additional_owners: Vec<String>,
30    authentication: &impl Authentication,
31    twapi_options: Option<&TwapiOptions>,
32) -> Result<(post_2_media_upload_id_finalize::Response, Headers), Error> {
33    // INIT
34    let file_size = get_file_size(path).await?;
35    let media_id = execute_init(
36        file_size,
37        media_type,
38        media_category,
39        additional_owners,
40        authentication,
41        twapi_options,
42    )
43    .await?;
44    tracing::info!(media_id = media_id, "post_media_upload_init");
45
46    // APPEND
47    execute_append(path, authentication, file_size, &media_id, twapi_options).await?;
48
49    // FINALIZE
50    let mut api = post_2_media_upload_id_finalize::Api::new(&media_id);
51    if let Some(twapi_options) = twapi_options {
52        api = api.twapi_options(twapi_options.clone());
53    }
54    let res = api.execute(authentication).await;
55    tracing::info!(media_id = media_id, "post_media_upload_finalize");
56    res
57}
58
59pub async fn upload_media_from_bytes(
60    data: &[u8],
61    media_type: &str,
62    media_category: Option<MediaCategory>,
63    additional_owners: Vec<String>,
64    authentication: &impl Authentication,
65    twapi_options: Option<&TwapiOptions>,
66) -> Result<(post_2_media_upload_id_finalize::Response, Headers), Error> {
67    // INIT
68    let file_size = data.len() as u64;
69    let media_id = execute_init(
70        file_size,
71        media_type,
72        media_category,
73        additional_owners,
74        authentication,
75        twapi_options,
76    )
77    .await?;
78    tracing::info!(media_id = media_id, "post_media_upload_init");
79
80    // APPEND
81    execute_append_from_bytes(data, authentication, file_size, &media_id, twapi_options).await?;
82
83    // FINALIZE
84    let mut api = post_2_media_upload_id_finalize::Api::new(&media_id);
85    if let Some(twapi_options) = twapi_options {
86        api = api.twapi_options(twapi_options.clone());
87    }
88    let res = api.execute(authentication).await;
89    tracing::info!(media_id = media_id, "post_media_upload_finalize");
90    res
91}
92
93async fn execute_init(
94    file_size: u64,
95    media_type: &str,
96    media_category: Option<MediaCategory>,
97    additional_owners: Vec<String>,
98    authentication: &impl Authentication,
99    twapi_options: Option<&TwapiOptions>,
100) -> Result<String, Error> {
101    let body = post_2_media_upload_initialize::Body {
102        total_bytes: file_size,
103        media_type: media_type.to_owned(),
104        media_category,
105        additional_owners,
106    };
107    let mut api = post_2_media_upload_initialize::Api::new(body);
108    if let Some(twapi_options) = twapi_options {
109        api = api.twapi_options(twapi_options.clone());
110    }
111    let (response, _) = api.execute(authentication).await?;
112    let media_id = response.data.and_then(|it| it.id).unwrap_or_default();
113    Ok(media_id)
114}
115
116pub fn get_media_id(response: &post_2_media_upload_id_finalize::Response) -> String {
117    let Some(data) = &response.data else {
118        return "".to_owned();
119    };
120    data.id.clone().unwrap_or_default()
121}
122
123async fn execute_append(
124    path: &PathBuf,
125    authentication: &impl Authentication,
126    file_size: u64,
127    media_id: &str,
128    twapi_options: Option<&TwapiOptions>,
129) -> Result<(), Error> {
130    let mut segment_index = 0;
131    let f = File::open(path).await?;
132    let mut reader = BufReader::new(f);
133    while segment_index * 5000000 < file_size {
134        let read_size: usize = if (segment_index + 1) * 5000000 < file_size {
135            5000000
136        } else {
137            (file_size - segment_index * 5000000) as usize
138        };
139        let mut cursor = Cursor::new(vec![0; read_size]);
140        reader.read_exact(cursor.get_mut()).await?;
141        let form = post_2_media_upload_id_append::FormData {
142            segment_index,
143            cursor,
144        };
145        let mut api = post_2_media_upload_id_append::Api::new(media_id, form);
146        if let Some(twapi_options) = twapi_options {
147            api = api.twapi_options(twapi_options.clone());
148        }
149        let _ = api.execute(authentication).await?;
150        tracing::info!(
151            segment_index = segment_index,
152            media_id = media_id,
153            "post_media_upload_append"
154        );
155        segment_index += 1;
156    }
157    Ok(())
158}
159
160async fn execute_append_from_bytes(
161    data: &[u8],
162    authentication: &impl Authentication,
163    file_size: u64,
164    media_id: &str,
165    twapi_options: Option<&TwapiOptions>,
166) -> Result<(), Error> {
167    let mut segment_index = 0;
168    while segment_index * 5000000 < file_size {
169        let start_pos = segment_index as usize * 5000000;
170        let remaining_bytes = file_size as usize - start_pos;
171        let read_size = std::cmp::min(5000000, remaining_bytes);
172
173        let data_slice = &data[start_pos..start_pos + read_size];
174        let cursor = Cursor::new(data_slice.to_owned());
175
176        let form = post_2_media_upload_id_append::FormData {
177            segment_index,
178            cursor,
179        };
180        let mut api = post_2_media_upload_id_append::Api::new(media_id, form);
181        if let Some(twapi_options) = twapi_options {
182            api = api.twapi_options(twapi_options.clone());
183        }
184        let _ = api.execute(authentication).await?;
185        tracing::info!(
186            segment_index = segment_index,
187            media_id = media_id,
188            "post_media_upload_append"
189        );
190        segment_index += 1;
191    }
192    Ok(())
193}
194
195fn get_check_after_secs(processing_info: &Option<ProcessingInfo>) -> Option<i64> {
196    let Some(processing_info) = processing_info else {
197        return None;
198    };
199    let state = &(processing_info.state.clone()?);
200    match state {
201        State::Pending | State::InProgress => processing_info.check_after_secs,
202        _ => None,
203    }
204}
205
206fn calc_progress_percent(res: &get_2_media_upload::Response) -> i64 {
207    let Some(data) = &res.data else {
208        return 0;
209    };
210    let Some(ref processing_info) = data.processing_info else {
211        return 0;
212    };
213    processing_info.progress_percent.unwrap_or(0)
214}
215
216pub async fn check_processing(
217    response: post_2_media_upload_id_finalize::Response,
218    authentication: &impl Authentication,
219    f: Option<impl Fn(i64, &get_2_media_upload::Response, &Headers) -> Result<(), Error>>,
220    twapi_options: Option<&TwapiOptions>,
221) -> Result<(), Error> {
222    let Some(data) = response.data else {
223        return Err(Error::Other("data not found".to_owned(), None));
224    };
225    let Some(media_id) = data.id else {
226        return Err(Error::Other("media_id not found".to_owned(), None));
227    };
228    let mut processing_info = data.processing_info;
229    let mut count = 0;
230    loop {
231        let Some(check_after_secs) = get_check_after_secs(&processing_info) else {
232            return Ok(());
233        };
234        tokio::time::sleep(std::time::Duration::from_secs(check_after_secs as u64)).await;
235        let mut api = get_2_media_upload::Api::new(&media_id, Command::Status);
236        if let Some(twapi_options) = twapi_options {
237            api = api.twapi_options(twapi_options.clone());
238        }
239        let (res, header) = api.execute(authentication).await?;
240        tracing::info!(
241            count = count,
242            media_id = media_id,
243            progress_percent = calc_progress_percent(&res),
244            "get_media_upload"
245        );
246        if let Some(ref f) = f {
247            f(count, &res, &header)?;
248        }
249        processing_info = res.data.and_then(|it| it.processing_info);
250        count += 1;
251    }
252}