Skip to main content

twapi_v2/
upload.rs

1use std::{
2    io::{BufReader, Cursor, Read},
3    path::PathBuf,
4};
5
6use reqwest::RequestBuilder;
7
8use crate::{
9    api::{Authentication, TwapiOptions, make_url_with_prefix},
10    error::Error,
11    headers::Headers,
12};
13
14use self::{media_category::MediaCategory, response::Response};
15
16pub mod get_media_upload;
17pub mod media_category;
18pub mod post_media_metadata_create;
19pub mod post_media_subtitles_create;
20pub mod post_media_subtitles_delete;
21pub mod post_media_upload_append;
22pub mod post_media_upload_finalize;
23pub mod post_media_upload_init;
24pub mod response;
25
26const POSTFIX_URL: &str = "/1.1/media/upload.json";
27const ENV_KEY: &str = "TWAPI_V2_MEDIA_API_PREFIX_API";
28const PREFIX_URL_MEDIA: &str = "https://upload.twitter.com";
29
30pub fn clear_prefix_url() {
31    // TODO: Audit that the environment access only happens in single-threaded code.
32    unsafe { std::env::set_var(ENV_KEY, PREFIX_URL_MEDIA) };
33}
34
35pub fn setup_prefix_url(url: &str) {
36    // TODO: Audit that the environment access only happens in single-threaded code.
37    unsafe { std::env::set_var(ENV_KEY, url) };
38}
39
40pub(crate) fn make_url(twapi_options: &Option<TwapiOptions>, postfix_url: Option<&str>) -> String {
41    make_url_with_prefix(
42        &std::env::var(ENV_KEY).unwrap_or(PREFIX_URL_MEDIA.to_owned()),
43        twapi_options,
44        postfix_url.unwrap_or(POSTFIX_URL),
45    )
46}
47
48pub(crate) async fn execute_no_response(builder: RequestBuilder) -> Result<Headers, Error> {
49    let response = builder.send().await?;
50    let status_code = response.status();
51    let header = response.headers();
52    let headers = Headers::new(header);
53    if status_code.is_success() {
54        Ok(headers)
55    } else {
56        let body = response.text().await?;
57        Err(Error::Other(body, Some(status_code)))
58    }
59}
60
61pub async fn upload_media(
62    path: &PathBuf,
63    media_type: &str,
64    media_category: Option<MediaCategory>,
65    additional_owners: Option<String>,
66    authentication: &impl Authentication,
67) -> Result<(response::Response, Headers), Error> {
68    // INIT
69    let metadata = std::fs::metadata(path)?;
70    let file_size = metadata.len();
71    let data = post_media_upload_init::Data {
72        total_bytes: file_size,
73        media_type: media_type.to_owned(),
74        media_category,
75        additional_owners,
76    };
77    let (response, _) = post_media_upload_init::Api::new(data)
78        .execute(authentication)
79        .await?;
80    let media_id = response.media_id_string;
81    tracing::info!(media_id = media_id, "post_media_upload_init");
82
83    // APPEND
84    execute_append(path, authentication, file_size, &media_id).await?;
85
86    // FINALIZE
87    let data = post_media_upload_finalize::Data {
88        media_id: media_id.clone(),
89    };
90    let res = post_media_upload_finalize::Api::new(data)
91        .execute(authentication)
92        .await;
93    tracing::info!(media_id = media_id, "post_media_upload_finalize");
94    res
95}
96
97async fn execute_append(
98    path: &PathBuf,
99    authentication: &impl Authentication,
100    file_size: u64,
101    media_id: &str,
102) -> Result<(), Error> {
103    let mut segment_index = 0;
104    let f = std::fs::File::open(path)?;
105    let mut reader = BufReader::new(f);
106    while segment_index * 5000000 < file_size {
107        let read_size: usize = if (segment_index + 1) * 5000000 < file_size {
108            5000000
109        } else {
110            (file_size - segment_index * 5000000) as usize
111        };
112        let mut cursor = Cursor::new(vec![0; read_size]);
113        reader.read_exact(cursor.get_mut())?;
114        let data = post_media_upload_append::Data {
115            media_id: media_id.to_owned(),
116            segment_index,
117            cursor,
118        };
119        let _ = post_media_upload_append::Api::new(data)
120            .execute(authentication)
121            .await?;
122        tracing::info!(
123            segment_index = segment_index,
124            media_id = media_id,
125            "post_media_upload_append"
126        );
127        segment_index += 1;
128    }
129    Ok(())
130}
131
132pub async fn check_processing(
133    response: Response,
134    authentication: &impl Authentication,
135    f: Option<impl Fn(i64, &Response, &Headers) -> Result<(), Error>>,
136) -> Result<(), Error> {
137    let media_id = response.media_id_string.clone();
138    let mut processing_info = response.processing_info;
139    let mut count = 0;
140    loop {
141        let Some(ref info) = processing_info else {
142            return Ok(());
143        };
144        if !info.state.is_continue() {
145            return Ok(());
146        }
147
148        if let Some(check_after_secs) = info.check_after_secs {
149            tokio::time::sleep(std::time::Duration::from_secs(check_after_secs)).await;
150            let (res, header) = get_media_upload::Api::new(media_id.clone())
151                .execute(authentication)
152                .await?;
153            let progress_percent = res
154                .processing_info
155                .as_ref()
156                .map(|it| it.progress_percent.unwrap_or(0))
157                .unwrap_or(0);
158            tracing::info!(
159                count = count,
160                media_id = media_id,
161                progress_percent = progress_percent,
162                "get_media_upload"
163            );
164            if let Some(ref f) = f {
165                f(count, &res, &header)?;
166            }
167            processing_info = res.processing_info;
168        } else {
169            return Err(Error::Upload("check_ofter_secs not found".to_owned()));
170        }
171        count += 1;
172    }
173}