1use crate::UploaderBuilder;
2use log::{error, info, warn};
3use notify::{watcher, DebouncedEvent, RecursiveMode, Result as NotifyResult, Watcher};
4use once_cell::sync::{Lazy, OnceCell};
5use reqwest::blocking::Client as HTTPClient;
6use serde::{Deserialize, Serialize};
7use std::{
8 collections::HashMap,
9 convert::TryInto,
10 env, fmt, fs,
11 io::Result as IOResult,
12 path::{Path, PathBuf},
13 sync::{mpsc::channel, RwLock},
14 thread::{Builder as ThreadBuilder, JoinHandle},
15 time::Duration,
16};
17use tap::prelude::*;
18
19#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
21pub struct Config {
22 #[serde(alias = "ak")]
23 access_key: String,
24 #[serde(alias = "sk")]
25 secret_key: String,
26
27 bucket: String,
28
29 #[serde(alias = "up_hosts")]
30 up_urls: Option<Vec<String>>,
31
32 #[serde(alias = "uc_hosts")]
33 uc_urls: Option<Vec<String>>,
34
35 #[serde(alias = "part")]
36 part_size: Option<u64>,
37
38 retry: Option<usize>,
39 punish_time_s: Option<u64>,
40 base_timeout_ms: Option<u64>,
41 base_timeout_multiple_percents: Option<HashMap<String, u32>>,
42 dial_timeout_ms: Option<u64>,
43}
44
45static QINIU_CONFIG: Lazy<RwLock<Option<Config>>> = Lazy::new(|| {
46 RwLock::new(load_config()).tap(|_| {
47 on_config_updated(|| {
48 if let Some(config) = load_config() {
49 *QINIU_CONFIG.write().unwrap() = Some(config);
50 }
51 info!("QINIU_CONFIG reloaded: {:?}", QINIU_CONFIG);
52 })
53 })
54});
55pub(super) static HTTP_CLIENT: Lazy<RwLock<HTTPClient>> = Lazy::new(|| {
56 RwLock::new(build_http_client()).tap(|_| {
57 on_config_updated(|| {
58 *HTTP_CLIENT.write().unwrap() = build_http_client();
59 info!("HTTP_CLIENT reloaded: {:?}", HTTP_CLIENT);
60 })
61 })
62});
63
64#[inline]
68pub fn is_qiniu_enabled() -> bool {
69 QINIU_CONFIG.read().unwrap().is_some()
70}
71
72fn build_http_client() -> HTTPClient {
73 let mut base_timeout_ms = 30000u64;
74 let mut dial_timeout_ms = DEFAULT_DIAL_TIMEOUT_MS;
75
76 if let Some(config) = QINIU_CONFIG.read().unwrap().as_ref() {
77 if let Some(value) = config.base_timeout_ms {
78 if value > 0 {
79 base_timeout_ms = value;
80 }
81 }
82 if let Some(value) = config.dial_timeout_ms {
83 if value > 0 {
84 dial_timeout_ms = value;
85 }
86 }
87 }
88 let user_agent = format!("QiniuRustUpload/{}", env!("CARGO_PKG_VERSION"));
89 HTTPClient::builder()
90 .user_agent(user_agent)
91 .connect_timeout(Duration::from_millis(dial_timeout_ms))
92 .timeout(Duration::from_millis(base_timeout_ms))
93 .pool_max_idle_per_host(5)
94 .connection_verbose(true)
95 .build()
96 .expect("Failed to build Reqwest Client")
97}
98
99#[cfg(test)]
100const DEFAULT_DIAL_TIMEOUT_MS: u64 = 1000;
101
102#[cfg(not(test))]
103const DEFAULT_DIAL_TIMEOUT_MS: u64 = 50;
104
105const QINIU_ENV: &str = "QINIU";
106
107fn load_config() -> Option<Config> {
108 if let Ok(qiniu_config_path) = env::var(QINIU_ENV) {
109 if let Ok(qiniu_config) = fs::read(&qiniu_config_path) {
110 let qiniu_config: Option<Config> = if qiniu_config_path.ends_with(".toml") {
111 toml::from_slice(&qiniu_config).ok()
112 } else {
113 serde_json::from_slice(&qiniu_config).ok()
114 };
115 if let Some(qiniu_config) = qiniu_config {
116 setup_config_watcher(&qiniu_config_path).ok();
117 return Some(qiniu_config);
118 } else {
119 error!(
120 "Qiniu config file cannot be deserialized: {}",
121 qiniu_config_path
122 );
123 return None;
124 }
125 } else {
126 error!("Qiniu config file cannot be open: {}", qiniu_config_path);
127 return None;
128 }
129 } else {
130 warn!("QINIU Env IS NOT ENABLED");
131 return None;
132 }
133
134 fn setup_config_watcher(config_path: impl Into<PathBuf>) -> IOResult<()> {
135 let config_path = config_path
136 .into()
137 .canonicalize()
138 .tap_err(|err| warn!("Failed to canonicalize config path: {:?}", err))?;
139
140 static UNIQUE_THREAD: OnceCell<JoinHandle<()>> = OnceCell::new();
141
142 if let Err(err) = UNIQUE_THREAD.get_or_try_init(|| {
143 ThreadBuilder::new()
144 .name("qiniu-config-watcher".into())
145 .spawn(move || {
146 if let Err(err) = setup_config_watcher_inner(&config_path) {
147 error!("Qiniu config file watcher was setup failed: {:?}", err);
148 }
149 })
150 }) {
151 error!(
152 "Failed to start thread to watch Qiniu config file: {:?}",
153 err
154 );
155 }
156
157 return Ok(());
158
159 fn setup_config_watcher_inner(config_path: &Path) -> NotifyResult<()> {
160 let (tx, rx) = channel();
161 let mut watcher = watcher(tx, Duration::from_millis(500))?;
162 watcher.watch(
163 config_path.parent().unwrap_or_else(|| Path::new("/")),
164 RecursiveMode::NonRecursive,
165 )?;
166
167 info!("Qiniu config file watcher was setup");
168
169 loop {
170 match rx.recv() {
171 Ok(event) => match event {
172 DebouncedEvent::Create(ref path) if path == config_path => {
173 event_received(event);
174 }
175 DebouncedEvent::Write(ref path) if path == config_path => {
176 event_received(event);
177 }
178 DebouncedEvent::Error(err, _) => {
179 error!(
180 "Received error event from Qiniu config file watcher: {:?}",
181 err
182 );
183 }
184 _ => {}
185 },
186 Err(err) => {
187 error!(
188 "Failed to receive event from Qiniu config file watcher: {:?}",
189 err
190 );
191 }
192 }
193 }
194 }
195
196 fn event_received(event: DebouncedEvent) {
197 info!("Received event {:?} from Qiniu config file watcher", event);
198 for handle in CONFIG_UPDATE_HANDLERS.read().unwrap().iter() {
199 handle();
200 }
201 }
202 }
203}
204
205type ConfigUpdateHandler = fn();
206type ConfigUpdateHandlers = Vec<ConfigUpdateHandler>;
207static CONFIG_UPDATE_HANDLERS: Lazy<RwLock<ConfigUpdateHandlers>> = Lazy::new(Default::default);
208
209pub(super) fn on_config_updated(handle: fn()) {
210 CONFIG_UPDATE_HANDLERS.write().unwrap().push(handle);
211}
212
213impl Config {
214 pub fn builder(
216 access_key: impl Into<String>,
217 secret_key: impl Into<String>,
218 bucket: impl Into<String>,
219 ) -> ConfigBuilder {
220 ConfigBuilder::new(access_key, secret_key, bucket)
221 }
222}
223
224#[derive(Debug)]
226pub struct ConfigBuilder {
227 inner: Config,
228}
229
230impl ConfigBuilder {
231 #[inline]
233 pub fn new(
234 access_key: impl Into<String>,
235 secret_key: impl Into<String>,
236 bucket: impl Into<String>,
237 ) -> Self {
238 Self {
239 inner: Config {
240 access_key: access_key.into(),
241 secret_key: secret_key.into(),
242 bucket: bucket.into(),
243 up_urls: None,
244 uc_urls: None,
245 part_size: None,
246 retry: None,
247 punish_time_s: None,
248 base_timeout_ms: None,
249 base_timeout_multiple_percents: None,
250 dial_timeout_ms: None,
251 },
252 }
253 }
254
255 #[inline]
257 pub fn build(self) -> Config {
258 self.inner
259 }
260
261 #[inline]
263 pub fn up_urls(mut self, up_urls: Vec<String>) -> Self {
264 self.inner.up_urls = Some(up_urls);
265 self
266 }
267
268 #[inline]
270 pub fn uc_urls(mut self, uc_urls: Vec<String>) -> Self {
271 self.inner.uc_urls = Some(uc_urls);
272 self
273 }
274
275 #[inline]
277 pub fn part_size(mut self, part_size: u64) -> Self {
278 self.inner.part_size = Some(part_size);
279 self
280 }
281
282 #[inline]
284 pub fn retry(mut self, retry: usize) -> Self {
285 self.inner.retry = Some(retry);
286 self
287 }
288
289 #[inline]
291 pub fn punish_time_s(mut self, punish_duration: Duration) -> Self {
292 self.inner.punish_time_s = Some(punish_duration.as_millis().try_into().unwrap_or(u64::MAX));
293 self
294 }
295
296 #[inline]
298 pub fn base_timeout_ms(mut self, base_timeout_duration: Duration) -> Self {
299 self.inner.base_timeout_ms = Some(
300 base_timeout_duration
301 .as_millis()
302 .try_into()
303 .unwrap_or(u64::MAX),
304 );
305 self
306 }
307
308 #[inline]
310 pub fn add_base_timeout_multiple_percent(
311 mut self,
312 service_name: ServiceName,
313 percent: u32,
314 ) -> Self {
315 if let Some(percents) = &mut self.inner.base_timeout_multiple_percents {
316 percents.insert(service_name.to_string(), percent);
317 } else {
318 let mut percents = HashMap::new();
319 percents.insert(service_name.to_string(), percent);
320 self.inner.base_timeout_multiple_percents = Some(percents);
321 }
322 self
323 }
324
325 #[inline]
327 pub fn dial_timeout_ms(mut self, dial_timeout_duration: Duration) -> Self {
328 self.inner.dial_timeout_ms = Some(
329 dial_timeout_duration
330 .as_millis()
331 .try_into()
332 .unwrap_or(u64::MAX),
333 );
334 self
335 }
336}
337
338#[inline]
339pub(super) fn build_uploader_builder_from_env() -> Option<UploaderBuilder> {
340 QINIU_CONFIG
341 .read()
342 .unwrap()
343 .as_ref()
344 .map(build_uploader_builder_from_config)
345}
346
347fn build_uploader_builder_from_config(config: &Config) -> UploaderBuilder {
348 let mut builder = UploaderBuilder::new(&config.access_key, &config.secret_key, &config.bucket);
349 if let Some(up_urls) = config.up_urls.as_ref() {
350 builder = builder.up_urls(up_urls.to_owned());
351 }
352 if let Some(uc_urls) = config.uc_urls.as_ref() {
353 builder = builder.uc_urls(uc_urls.to_owned());
354 }
355 if let Some(retry) = config.retry.as_ref() {
356 builder = builder
357 .up_tries(retry.to_owned())
358 .uc_tries(retry.to_owned());
359 }
360 if let Some(base_timeout_multiple_percents) = config.base_timeout_multiple_percents.as_ref() {
361 if let Some(&uc_timeout_multiple_percents) =
362 base_timeout_multiple_percents.get(&ServiceName::Uc.to_string())
363 {
364 builder = builder.uc_timeout_multiple(uc_timeout_multiple_percents);
365 }
366 if let Some(&up_timeout_multiple_percents) =
367 base_timeout_multiple_percents.get(&ServiceName::Up.to_string())
368 {
369 builder = builder.up_timeout_multiple(up_timeout_multiple_percents);
370 }
371 }
372
373 if let Some(punish_time_s) = config.punish_time_s.as_ref() {
374 builder = builder.punish_duration(Duration::from_secs(punish_time_s.to_owned()));
375 }
376 if let Some(base_timeout_ms) = config.base_timeout_ms.as_ref() {
377 builder = builder.base_timeout(Duration::from_millis(base_timeout_ms.to_owned()));
378 }
379 if let Some(part_size) = config.part_size.as_ref() {
380 builder = builder.part_size(part_size.to_owned() * (1 << 20));
381 }
382 builder
383}
384
385#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
387#[non_exhaustive]
388pub enum ServiceName {
389 Up,
391 Uc,
393}
394
395impl fmt::Display for ServiceName {
396 #[inline]
397 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
398 match self {
399 Self::Uc => "uc".fmt(f),
400 Self::Up => "up".fmt(f),
401 }
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use std::{
409 error::Error,
410 fs::{remove_file, rename, OpenOptions},
411 io::Write,
412 sync::atomic::{AtomicUsize, Ordering::Relaxed},
413 thread::sleep,
414 };
415 use tempfile::Builder as TempFileBuilder;
416
417 #[test]
418 fn test_load_config() -> Result<(), Box<dyn Error>> {
419 env_logger::try_init().ok();
420
421 let mut config = Config {
422 access_key: "test-ak-1".into(),
423 secret_key: "test-sk-1".into(),
424 bucket: "test-bucket-1".into(),
425 up_urls: Some(vec!["http://up1.com".into(), "http://up2.com".into()]),
426 uc_urls: Default::default(),
427 retry: Default::default(),
428 punish_time_s: Default::default(),
429 base_timeout_ms: Default::default(),
430 dial_timeout_ms: Default::default(),
431 part_size: Default::default(),
432 base_timeout_multiple_percents: Default::default(),
433 };
434 let tempfile_path = {
435 let mut tempfile = TempFileBuilder::new().suffix(".toml").tempfile()?;
436 tempfile.write_all(&toml::to_vec(&config)?)?;
437 tempfile.flush()?;
438 env::set_var(QINIU_ENV, tempfile.path().as_os_str());
439 tempfile.into_temp_path()
440 };
441
442 static UPDATED: AtomicUsize = AtomicUsize::new(0);
443 UPDATED.store(0, Relaxed);
444
445 let loaded = load_config().unwrap();
446 assert_eq!(loaded, config);
447
448 on_config_updated(|| {
449 UPDATED.fetch_add(1, Relaxed);
450 });
451 on_config_updated(|| {
452 UPDATED.fetch_add(1, Relaxed);
453 });
454 on_config_updated(|| {
455 UPDATED.fetch_add(1, Relaxed);
456 });
457
458 sleep(Duration::from_secs(1));
459
460 config.access_key = "test-ak-2".into();
461 config.secret_key = "test-sk-2".into();
462 config.bucket = "test-bucket-2".into();
463
464 {
465 let mut tempfile = OpenOptions::new()
466 .write(true)
467 .truncate(true)
468 .open(&tempfile_path)?;
469 tempfile.write_all(&toml::to_vec(&config)?)?;
470 tempfile.flush()?;
471 }
472
473 sleep(Duration::from_secs(1));
474 assert_eq!(UPDATED.load(Relaxed), 3);
475
476 config.access_key = "test-ak-3".into();
477 config.secret_key = "test-sk-3".into();
478 config.bucket = "test-bucket-3".into();
479
480 {
481 remove_file(&tempfile_path)?;
482 let mut tempfile = OpenOptions::new()
483 .write(true)
484 .create(true)
485 .open(&tempfile_path)?;
486 tempfile.write_all(&toml::to_vec(&config)?)?;
487 tempfile.flush()?;
488 }
489
490 sleep(Duration::from_secs(1));
491 assert_eq!(UPDATED.load(Relaxed), 6);
492
493 {
494 let new_tempfile_path = {
495 let mut new_path = tempfile_path.to_owned().into_os_string();
496 new_path.push(".tmp");
497 new_path
498 };
499 let mut tempfile = OpenOptions::new()
500 .write(true)
501 .create(true)
502 .open(&new_tempfile_path)?;
503 tempfile.write_all(&toml::to_vec(&config)?)?;
504 tempfile.flush()?;
505 rename(&new_tempfile_path, &tempfile_path)?;
506 }
507
508 sleep(Duration::from_secs(1));
509 assert_eq!(UPDATED.load(Relaxed), 9);
510
511 {
512 let new_tempfile_path = {
513 let mut new_path = tempfile_path.to_owned().into_os_string();
514 new_path.push(".tmp");
515 new_path
516 };
517 let mut tempfile = OpenOptions::new()
518 .write(true)
519 .create(true)
520 .open(&new_tempfile_path)?;
521 tempfile.write_all(&toml::to_vec(&config)?)?;
522 tempfile.flush()?;
523 remove_file(&tempfile_path)?;
524 rename(&new_tempfile_path, &tempfile_path)?;
525 }
526
527 sleep(Duration::from_secs(1));
528 assert_eq!(UPDATED.load(Relaxed), 12);
529
530 remove_file(&tempfile_path)?;
531
532 Ok(())
533 }
534}