1use std::{io::Cursor, path::PathBuf};
2
3use tokio::{
4 fs::{metadata, File},
5 io::{AsyncReadExt, BufReader},
6};
7
8use crate::{
9 api::{
10 get_2_media_upload::{self, Command},
11 post_2_media_upload_append, post_2_media_upload_finalize,
12 post_2_media_upload_init::{self, MediaCategory},
13 Authentication, TwapiOptions,
14 },
15 error::Error,
16 headers::Headers,
17 responses::processing_info::{ProcessingInfo, State},
18};
19
20async fn get_file_size(path: &PathBuf) -> Result<u64, Error> {
21 let metadata = metadata(path).await?;
22 Ok(metadata.len())
23}
24
25pub async fn upload_media(
26 path: &PathBuf,
27 media_type: &str,
28 media_category: Option<MediaCategory>,
29 additional_owners: Option<String>,
30 authentication: &impl Authentication,
31 twapi_options: Option<&TwapiOptions>,
32) -> Result<(post_2_media_upload_finalize::Response, Headers), Error> {
33 let file_size = get_file_size(path).await?;
35 let media_id = execute_init(
36 file_size,
37 media_type,
38 media_category,
39 additional_owners,
40 authentication,
41 twapi_options,
42 )
43 .await?;
44 tracing::info!(media_id = media_id, "post_media_upload_init");
45
46 execute_append(path, authentication, file_size, &media_id, twapi_options).await?;
48
49 let data = post_2_media_upload_finalize::FormData {
51 media_id: media_id.clone(),
52 };
53 let mut api = post_2_media_upload_finalize::Api::new(data);
54 if let Some(twapi_options) = twapi_options {
55 api = api.twapi_options(twapi_options.clone());
56 }
57 let res = api.execute(authentication).await;
58 tracing::info!(media_id = media_id, "post_media_upload_finalize");
59 res
60}
61
62async fn execute_init(
63 file_size: u64,
64 media_type: &str,
65 media_category: Option<MediaCategory>,
66 additional_owners: Option<String>,
67 authentication: &impl Authentication,
68 twapi_options: Option<&TwapiOptions>,
69) -> Result<String, Error> {
70 let data = post_2_media_upload_init::FormData {
71 total_bytes: file_size,
72 media_type: media_type.to_owned(),
73 media_category,
74 additional_owners,
75 };
76 let mut api = post_2_media_upload_init::Api::new(data);
77 if let Some(twapi_options) = twapi_options {
78 api = api.twapi_options(twapi_options.clone());
79 }
80 let (response, _) = api.execute(authentication).await?;
81 let media_id = response.data.and_then(|it| it.id).unwrap_or_default();
82 Ok(media_id)
83}
84
85pub fn get_media_id(response: &post_2_media_upload_finalize::Response) -> String {
86 let Some(data) = &response.data else {
87 return "".to_owned();
88 };
89 data.id.clone().unwrap_or_default()
90}
91
92async fn execute_append(
93 path: &PathBuf,
94 authentication: &impl Authentication,
95 file_size: u64,
96 media_id: &str,
97 twapi_options: Option<&TwapiOptions>,
98) -> Result<(), Error> {
99 let mut segment_index = 0;
100 let f = File::open(path).await?;
101 let mut reader = BufReader::new(f);
102 while segment_index * 5000000 < file_size {
103 let read_size: usize = if (segment_index + 1) * 5000000 < file_size {
104 5000000
105 } else {
106 (file_size - segment_index * 5000000) as usize
107 };
108 let mut cursor = Cursor::new(vec![0; read_size]);
109 reader.read_exact(cursor.get_mut()).await?;
110 let data = post_2_media_upload_append::FormData {
111 media_id: media_id.to_owned(),
112 segment_index,
113 cursor,
114 };
115 let mut api = post_2_media_upload_append::Api::new(data);
116 if let Some(twapi_options) = twapi_options {
117 api = api.twapi_options(twapi_options.clone());
118 }
119 let _ = api.execute(authentication).await?;
120 tracing::info!(
121 segment_index = segment_index,
122 media_id = media_id,
123 "post_media_upload_append"
124 );
125 segment_index += 1;
126 }
127 Ok(())
128}
129
130fn get_check_after_secs(processing_info: &Option<ProcessingInfo>) -> Option<i64> {
131 let Some(ref processing_info) = processing_info else {
132 return None;
133 };
134 let state = &(processing_info.state.clone()?);
135 match state {
136 State::Pending | State::InProgress => processing_info.check_after_secs,
137 _ => None,
138 }
139}
140
141fn calc_progress_percent(res: &get_2_media_upload::Response) -> i64 {
142 let Some(data) = &res.data else {
143 return 0;
144 };
145 let Some(ref processing_info) = data.processing_info else {
146 return 0;
147 };
148 processing_info.progress_percent.unwrap_or(0)
149}
150
151pub async fn check_processing(
152 response: post_2_media_upload_finalize::Response,
153 authentication: &impl Authentication,
154 f: Option<impl Fn(i64, &get_2_media_upload::Response, &Headers) -> Result<(), Error>>,
155 twapi_options: Option<&TwapiOptions>,
156) -> Result<(), Error> {
157 let Some(data) = response.data else {
158 return Err(Error::Other("data not found".to_owned(), None));
159 };
160 let Some(media_id) = data.id else {
161 return Err(Error::Other("media_id not found".to_owned(), None));
162 };
163 let mut processing_info = data.processing_info;
164 let mut count = 0;
165 loop {
166 let Some(check_after_secs) = get_check_after_secs(&processing_info) else {
167 return Ok(());
168 };
169 tokio::time::sleep(std::time::Duration::from_secs(check_after_secs as u64)).await;
170 let mut api = get_2_media_upload::Api::new(&media_id, Command::Status);
171 if let Some(twapi_options) = twapi_options {
172 api = api.twapi_options(twapi_options.clone());
173 }
174 let (res, header) = api.execute(authentication).await?;
175 tracing::info!(
176 count = count,
177 media_id = media_id,
178 progress_percent = calc_progress_percent(&res),
179 "get_media_upload"
180 );
181 if let Some(ref f) = f {
182 f(count, &res, &header)?;
183 }
184 processing_info = res.data.and_then(|it| it.processing_info);
185 count += 1;
186 }
187}