twapi_v2/
upload.rs

1use std::{
2    io::{BufReader, Cursor, Read},
3    path::PathBuf,
4};
5
6use reqwest::RequestBuilder;
7
8use crate::{
9    api::{make_url_with_prefix, Authentication, TwapiOptions},
10    error::Error,
11    headers::Headers,
12};
13
14use self::{media_category::MediaCategory, response::Response};
15
16pub mod get_media_upload;
17pub mod media_category;
18pub mod post_media_metadata_create;
19pub mod post_media_subtitles_create;
20pub mod post_media_subtitles_delete;
21pub mod post_media_upload_append;
22pub mod post_media_upload_finalize;
23pub mod post_media_upload_init;
24pub mod response;
25
26const POSTFIX_URL: &str = "/1.1/media/upload.json";
27const ENV_KEY: &str = "TWAPI_V2_MEDIA_API_PREFIX_API";
28const PREFIX_URL_MEDIA: &str = "https://upload.twitter.com";
29
30pub fn clear_prefix_url() {
31    std::env::set_var(ENV_KEY, PREFIX_URL_MEDIA);
32}
33
34pub fn setup_prefix_url(url: &str) {
35    std::env::set_var(ENV_KEY, url);
36}
37
38pub(crate) fn make_url(twapi_options: &Option<TwapiOptions>, postfix_url: Option<&str>) -> String {
39    make_url_with_prefix(
40        &std::env::var(ENV_KEY).unwrap_or(PREFIX_URL_MEDIA.to_owned()),
41        twapi_options,
42        postfix_url.unwrap_or(POSTFIX_URL),
43    )
44}
45
46pub(crate) async fn execute_no_response(builder: RequestBuilder) -> Result<Headers, Error> {
47    let response = builder.send().await?;
48    let status_code = response.status();
49    let header = response.headers();
50    let headers = Headers::new(header);
51    if status_code.is_success() {
52        Ok(headers)
53    } else {
54        let body = response.text().await?;
55        Err(Error::Other(body, Some(status_code)))
56    }
57}
58
59pub async fn upload_media(
60    path: &PathBuf,
61    media_type: &str,
62    media_category: Option<MediaCategory>,
63    additional_owners: Option<String>,
64    authentication: &impl Authentication,
65) -> Result<(response::Response, Headers), Error> {
66    // INIT
67    let metadata = std::fs::metadata(path)?;
68    let file_size = metadata.len();
69    let data = post_media_upload_init::Data {
70        total_bytes: file_size,
71        media_type: media_type.to_owned(),
72        media_category,
73        additional_owners,
74    };
75    let (response, _) = post_media_upload_init::Api::new(data)
76        .execute(authentication)
77        .await?;
78    let media_id = response.media_id_string;
79    tracing::info!(media_id = media_id, "post_media_upload_init");
80
81    // APPEND
82    execute_append(path, authentication, file_size, &media_id).await?;
83
84    // FINALIZE
85    let data = post_media_upload_finalize::Data {
86        media_id: media_id.clone(),
87    };
88    let res = post_media_upload_finalize::Api::new(data)
89        .execute(authentication)
90        .await;
91    tracing::info!(media_id = media_id, "post_media_upload_finalize");
92    res
93}
94
95async fn execute_append(
96    path: &PathBuf,
97    authentication: &impl Authentication,
98    file_size: u64,
99    media_id: &str,
100) -> Result<(), Error> {
101    let mut segment_index = 0;
102    let f = std::fs::File::open(path)?;
103    let mut reader = BufReader::new(f);
104    while segment_index * 5000000 < file_size {
105        let read_size: usize = if (segment_index + 1) * 5000000 < file_size {
106            5000000
107        } else {
108            (file_size - segment_index * 5000000) as usize
109        };
110        let mut cursor = Cursor::new(vec![0; read_size]);
111        reader.read_exact(cursor.get_mut())?;
112        let data = post_media_upload_append::Data {
113            media_id: media_id.to_owned(),
114            segment_index,
115            cursor,
116        };
117        let _ = post_media_upload_append::Api::new(data)
118            .execute(authentication)
119            .await?;
120        tracing::info!(
121            segment_index = segment_index,
122            media_id = media_id,
123            "post_media_upload_append"
124        );
125        segment_index += 1;
126    }
127    Ok(())
128}
129
130pub async fn check_processing(
131    response: Response,
132    authentication: &impl Authentication,
133    f: Option<impl Fn(i64, &Response, &Headers) -> Result<(), Error>>,
134) -> Result<(), Error> {
135    let media_id = response.media_id_string.clone();
136    let mut processing_info = response.processing_info;
137    let mut count = 0;
138    loop {
139        let Some(ref info) = processing_info else {
140            return Ok(());
141        };
142        if !info.state.is_continue() {
143            return Ok(());
144        }
145
146        if let Some(check_after_secs) = info.check_after_secs {
147            tokio::time::sleep(std::time::Duration::from_secs(check_after_secs)).await;
148            let (res, header) = get_media_upload::Api::new(media_id.clone())
149                .execute(authentication)
150                .await?;
151            let progress_percent = res
152                .processing_info
153                .as_ref()
154                .map(|it| it.progress_percent.unwrap_or(0))
155                .unwrap_or(0);
156            tracing::info!(
157                count = count,
158                media_id = media_id,
159                progress_percent = progress_percent,
160                "get_media_upload"
161            );
162            if let Some(ref f) = f {
163                f(count, &res, &header)?;
164            }
165            processing_info = res.processing_info;
166        } else {
167            return Err(Error::Upload("check_ofter_secs not found".to_owned()));
168        }
169        count += 1;
170    }
171}