qiniu_upload_manager/
callbacks.rs1use super::UploadedPart;
2use anyhow::Result as AnyResult;
3use qiniu_apis::{
4 http::{ResponseParts, TransferProgressInfo},
5 http_client::{RequestBuilderParts, Response, ResponseError},
6};
7use std::{
8 fmt::{self, Debug},
9 sync::Arc,
10};
11
12type BeforeRequestCallback<'c> = Arc<dyn Fn(&mut RequestBuilderParts<'_>) -> AnyResult<()> + Send + Sync + 'c>;
13type UploadProgressCallback<'c> = Arc<dyn Fn(&UploadingProgressInfo) -> AnyResult<()> + Send + Sync + 'c>;
14type PartUploadedCallback<'c> = Arc<dyn Fn(&dyn UploadedPart) -> AnyResult<()> + Send + Sync + 'c>;
15type AfterResponseOkCallback<'c> = Arc<dyn Fn(&mut ResponseParts) -> AnyResult<()> + Send + Sync + 'c>;
16type AfterResponseErrorCallback<'c> = Arc<dyn Fn(&mut ResponseError) -> AnyResult<()> + Send + Sync + 'c>;
17
18pub trait UploaderWithCallbacks {
20 fn on_before_request<F: Fn(&mut RequestBuilderParts<'_>) -> AnyResult<()> + Send + Sync + 'static>(
22 &mut self,
23 callback: F,
24 ) -> &mut Self;
25
26 fn on_upload_progress<F: Fn(&UploadingProgressInfo) -> AnyResult<()> + Send + Sync + 'static>(
28 &mut self,
29 callback: F,
30 ) -> &mut Self;
31
32 fn on_response_ok<F: Fn(&mut ResponseParts) -> AnyResult<()> + Send + Sync + 'static>(
34 &mut self,
35 callback: F,
36 ) -> &mut Self;
37
38 fn on_response_error<F: Fn(&mut ResponseError) -> AnyResult<()> + Send + Sync + 'static>(
40 &mut self,
41 callback: F,
42 ) -> &mut Self;
43}
44
45pub trait MultiPartsUploaderWithCallbacks: UploaderWithCallbacks {
47 fn on_part_uploaded<F: Fn(&dyn UploadedPart) -> AnyResult<()> + Send + Sync + 'static>(
49 &mut self,
50 callback: F,
51 ) -> &mut Self;
52}
53
54#[derive(Default, Clone)]
55pub(super) struct Callbacks<'a> {
56 before_request_callbacks: Vec<BeforeRequestCallback<'a>>,
57 upload_progress_callbacks: Vec<UploadProgressCallback<'a>>,
58 part_uploaded_callbacks: Vec<PartUploadedCallback<'a>>,
59 after_response_ok_callbacks: Vec<AfterResponseOkCallback<'a>>,
60 after_response_error_callbacks: Vec<AfterResponseErrorCallback<'a>>,
61}
62
63impl<'a> Callbacks<'a> {
64 pub(super) fn insert_before_request_callback(
65 &mut self,
66 callback: impl Fn(&mut RequestBuilderParts<'_>) -> AnyResult<()> + Send + Sync + 'a,
67 ) -> &mut Self {
68 self.before_request_callbacks.push(Arc::new(callback));
69 self
70 }
71
72 pub(super) fn insert_upload_progress_callback(
73 &mut self,
74 callback: impl Fn(&UploadingProgressInfo) -> AnyResult<()> + Send + Sync + 'a,
75 ) -> &mut Self {
76 self.upload_progress_callbacks.push(Arc::new(callback));
77 self
78 }
79
80 pub(super) fn insert_part_uploaded_callback(
81 &mut self,
82 callback: impl Fn(&dyn UploadedPart) -> AnyResult<()> + Send + Sync + 'a,
83 ) -> &mut Self {
84 self.part_uploaded_callbacks.push(Arc::new(callback));
85 self
86 }
87
88 pub(super) fn insert_after_response_ok_callback(
89 &mut self,
90 callback: impl Fn(&mut ResponseParts) -> AnyResult<()> + Send + Sync + 'a,
91 ) -> &mut Self {
92 self.after_response_ok_callbacks.push(Arc::new(callback));
93 self
94 }
95
96 pub(super) fn insert_after_response_error_callback(
97 &mut self,
98 callback: impl Fn(&mut ResponseError) -> AnyResult<()> + Send + Sync + 'a,
99 ) -> &mut Self {
100 self.after_response_error_callbacks.push(Arc::new(callback));
101 self
102 }
103
104 pub(super) fn before_request(&self, builder_parts: &mut RequestBuilderParts) -> AnyResult<()> {
105 for callback in self.before_request_callbacks.iter() {
106 callback(builder_parts)?;
107 }
108 Ok(())
109 }
110
111 pub(super) fn upload_progress(&self, progress_info: &UploadingProgressInfo) -> AnyResult<()> {
112 for callback in self.upload_progress_callbacks.iter() {
113 callback(progress_info)?;
114 }
115 Ok(())
116 }
117
118 pub(super) fn part_uploaded(&self, progress_info: &dyn UploadedPart) -> AnyResult<()> {
119 for callback in self.part_uploaded_callbacks.iter() {
120 callback(progress_info)?;
121 }
122 Ok(())
123 }
124
125 pub(super) fn after_response<B>(&self, result: &mut Result<Response<B>, ResponseError>) -> AnyResult<()> {
126 match result {
127 Ok(response) => self.after_response_ok(response.parts_mut()),
128 Err(err) => self.after_response_error(err),
129 }
130 }
131
132 fn after_response_ok(&self, response_parts: &mut ResponseParts) -> AnyResult<()> {
133 for callback in self.after_response_ok_callbacks.iter() {
134 callback(response_parts)?;
135 }
136 Ok(())
137 }
138
139 fn after_response_error(&self, error: &mut ResponseError) -> AnyResult<()> {
140 for callback in self.after_response_error_callbacks.iter() {
141 callback(error)?;
142 }
143 Ok(())
144 }
145}
146
147impl Debug for Callbacks<'_> {
148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149 f.debug_struct("Callbacks").finish()
150 }
151}
152
153#[derive(Debug, Clone, Copy)]
155pub struct UploadingProgressInfo {
156 transferred_bytes: u64,
157 total_bytes: Option<u64>,
158}
159
160impl UploadingProgressInfo {
161 #[inline]
163 pub fn new(transferred_bytes: u64, total_bytes: Option<u64>) -> Self {
164 Self {
165 transferred_bytes,
166 total_bytes,
167 }
168 }
169
170 #[inline]
172 pub fn transferred_bytes(&self) -> u64 {
173 self.transferred_bytes
174 }
175
176 #[inline]
178 pub fn total_bytes(&self) -> Option<u64> {
179 self.total_bytes
180 }
181}
182
183impl<'a> From<&'a TransferProgressInfo<'a>> for UploadingProgressInfo {
184 #[inline]
185 fn from(t: &'a TransferProgressInfo<'a>) -> Self {
186 Self::new(t.transferred_bytes(), Some(t.total_bytes()))
187 }
188}
189
190impl From<TransferProgressInfo<'_>> for UploadingProgressInfo {
191 #[inline]
192 fn from(t: TransferProgressInfo<'_>) -> Self {
193 Self::new(t.transferred_bytes(), Some(t.total_bytes()))
194 }
195}