qiniu_upload/
config.rs

1use crate::UploaderBuilder;
2use log::{error, info, warn};
3use notify::{watcher, DebouncedEvent, RecursiveMode, Result as NotifyResult, Watcher};
4use once_cell::sync::{Lazy, OnceCell};
5use reqwest::blocking::Client as HTTPClient;
6use serde::{Deserialize, Serialize};
7use std::{
8    collections::HashMap,
9    convert::TryInto,
10    env, fmt, fs,
11    io::Result as IOResult,
12    path::{Path, PathBuf},
13    sync::{mpsc::channel, RwLock},
14    thread::{Builder as ThreadBuilder, JoinHandle},
15    time::Duration,
16};
17use tap::prelude::*;
18
19/// 七牛配置信息
20#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
21pub struct Config {
22    #[serde(alias = "ak")]
23    access_key: String,
24    #[serde(alias = "sk")]
25    secret_key: String,
26
27    bucket: String,
28
29    #[serde(alias = "up_hosts")]
30    up_urls: Option<Vec<String>>,
31
32    #[serde(alias = "uc_hosts")]
33    uc_urls: Option<Vec<String>>,
34
35    #[serde(alias = "part")]
36    part_size: Option<u64>,
37
38    retry: Option<usize>,
39    punish_time_s: Option<u64>,
40    base_timeout_ms: Option<u64>,
41    base_timeout_multiple_percents: Option<HashMap<String, u32>>,
42    dial_timeout_ms: Option<u64>,
43}
44
45static QINIU_CONFIG: Lazy<RwLock<Option<Config>>> = Lazy::new(|| {
46    RwLock::new(load_config()).tap(|_| {
47        on_config_updated(|| {
48            if let Some(config) = load_config() {
49                *QINIU_CONFIG.write().unwrap() = Some(config);
50            }
51            info!("QINIU_CONFIG reloaded: {:?}", QINIU_CONFIG);
52        })
53    })
54});
55pub(super) static HTTP_CLIENT: Lazy<RwLock<HTTPClient>> = Lazy::new(|| {
56    RwLock::new(build_http_client()).tap(|_| {
57        on_config_updated(|| {
58            *HTTP_CLIENT.write().unwrap() = build_http_client();
59            info!("HTTP_CLIENT reloaded: {:?}", HTTP_CLIENT);
60        })
61    })
62});
63
64/// 判断当前是否已经启用七牛环境
65///
66/// 如果当前没有设置 QINIU 环境变量,或加载该环境变量出现错误,则返回 false
67#[inline]
68pub fn is_qiniu_enabled() -> bool {
69    QINIU_CONFIG.read().unwrap().is_some()
70}
71
72fn build_http_client() -> HTTPClient {
73    let mut base_timeout_ms = 30000u64;
74    let mut dial_timeout_ms = DEFAULT_DIAL_TIMEOUT_MS;
75
76    if let Some(config) = QINIU_CONFIG.read().unwrap().as_ref() {
77        if let Some(value) = config.base_timeout_ms {
78            if value > 0 {
79                base_timeout_ms = value;
80            }
81        }
82        if let Some(value) = config.dial_timeout_ms {
83            if value > 0 {
84                dial_timeout_ms = value;
85            }
86        }
87    }
88    let user_agent = format!("QiniuRustUpload/{}", env!("CARGO_PKG_VERSION"));
89    HTTPClient::builder()
90        .user_agent(user_agent)
91        .connect_timeout(Duration::from_millis(dial_timeout_ms))
92        .timeout(Duration::from_millis(base_timeout_ms))
93        .pool_max_idle_per_host(5)
94        .connection_verbose(true)
95        .build()
96        .expect("Failed to build Reqwest Client")
97}
98
99#[cfg(test)]
100const DEFAULT_DIAL_TIMEOUT_MS: u64 = 1000;
101
102#[cfg(not(test))]
103const DEFAULT_DIAL_TIMEOUT_MS: u64 = 50;
104
105const QINIU_ENV: &str = "QINIU";
106
107fn load_config() -> Option<Config> {
108    if let Ok(qiniu_config_path) = env::var(QINIU_ENV) {
109        if let Ok(qiniu_config) = fs::read(&qiniu_config_path) {
110            let qiniu_config: Option<Config> = if qiniu_config_path.ends_with(".toml") {
111                toml::from_slice(&qiniu_config).ok()
112            } else {
113                serde_json::from_slice(&qiniu_config).ok()
114            };
115            if let Some(qiniu_config) = qiniu_config {
116                setup_config_watcher(&qiniu_config_path).ok();
117                return Some(qiniu_config);
118            } else {
119                error!(
120                    "Qiniu config file cannot be deserialized: {}",
121                    qiniu_config_path
122                );
123                return None;
124            }
125        } else {
126            error!("Qiniu config file cannot be open: {}", qiniu_config_path);
127            return None;
128        }
129    } else {
130        warn!("QINIU Env IS NOT ENABLED");
131        return None;
132    }
133
134    fn setup_config_watcher(config_path: impl Into<PathBuf>) -> IOResult<()> {
135        let config_path = config_path
136            .into()
137            .canonicalize()
138            .tap_err(|err| warn!("Failed to canonicalize config path: {:?}", err))?;
139
140        static UNIQUE_THREAD: OnceCell<JoinHandle<()>> = OnceCell::new();
141
142        if let Err(err) = UNIQUE_THREAD.get_or_try_init(|| {
143            ThreadBuilder::new()
144                .name("qiniu-config-watcher".into())
145                .spawn(move || {
146                    if let Err(err) = setup_config_watcher_inner(&config_path) {
147                        error!("Qiniu config file watcher was setup failed: {:?}", err);
148                    }
149                })
150        }) {
151            error!(
152                "Failed to start thread to watch Qiniu config file: {:?}",
153                err
154            );
155        }
156
157        return Ok(());
158
159        fn setup_config_watcher_inner(config_path: &Path) -> NotifyResult<()> {
160            let (tx, rx) = channel();
161            let mut watcher = watcher(tx, Duration::from_millis(500))?;
162            watcher.watch(
163                config_path.parent().unwrap_or_else(|| Path::new("/")),
164                RecursiveMode::NonRecursive,
165            )?;
166
167            info!("Qiniu config file watcher was setup");
168
169            loop {
170                match rx.recv() {
171                    Ok(event) => match event {
172                        DebouncedEvent::Create(ref path) if path == config_path => {
173                            event_received(event);
174                        }
175                        DebouncedEvent::Write(ref path) if path == config_path => {
176                            event_received(event);
177                        }
178                        DebouncedEvent::Error(err, _) => {
179                            error!(
180                                "Received error event from Qiniu config file watcher: {:?}",
181                                err
182                            );
183                        }
184                        _ => {}
185                    },
186                    Err(err) => {
187                        error!(
188                            "Failed to receive event from Qiniu config file watcher: {:?}",
189                            err
190                        );
191                    }
192                }
193            }
194        }
195
196        fn event_received(event: DebouncedEvent) {
197            info!("Received event {:?} from Qiniu config file watcher", event);
198            for handle in CONFIG_UPDATE_HANDLERS.read().unwrap().iter() {
199                handle();
200            }
201        }
202    }
203}
204
205type ConfigUpdateHandler = fn();
206type ConfigUpdateHandlers = Vec<ConfigUpdateHandler>;
207static CONFIG_UPDATE_HANDLERS: Lazy<RwLock<ConfigUpdateHandlers>> = Lazy::new(Default::default);
208
209pub(super) fn on_config_updated(handle: fn()) {
210    CONFIG_UPDATE_HANDLERS.write().unwrap().push(handle);
211}
212
213impl Config {
214    /// 创建七牛配置信息构建器
215    pub fn builder(
216        access_key: impl Into<String>,
217        secret_key: impl Into<String>,
218        bucket: impl Into<String>,
219    ) -> ConfigBuilder {
220        ConfigBuilder::new(access_key, secret_key, bucket)
221    }
222}
223
224/// 七牛配置信息构建器
225#[derive(Debug)]
226pub struct ConfigBuilder {
227    inner: Config,
228}
229
230impl ConfigBuilder {
231    /// 创建七牛配置信息构建器
232    #[inline]
233    pub fn new(
234        access_key: impl Into<String>,
235        secret_key: impl Into<String>,
236        bucket: impl Into<String>,
237    ) -> Self {
238        Self {
239            inner: Config {
240                access_key: access_key.into(),
241                secret_key: secret_key.into(),
242                bucket: bucket.into(),
243                up_urls: None,
244                uc_urls: None,
245                part_size: None,
246                retry: None,
247                punish_time_s: None,
248                base_timeout_ms: None,
249                base_timeout_multiple_percents: None,
250                dial_timeout_ms: None,
251            },
252        }
253    }
254
255    /// 构建七牛配置信息
256    #[inline]
257    pub fn build(self) -> Config {
258        self.inner
259    }
260
261    /// 配置 UP 服务器域名列表
262    #[inline]
263    pub fn up_urls(mut self, up_urls: Vec<String>) -> Self {
264        self.inner.up_urls = Some(up_urls);
265        self
266    }
267
268    /// 配置 UC 服务器域名列表
269    #[inline]
270    pub fn uc_urls(mut self, uc_urls: Vec<String>) -> Self {
271        self.inner.uc_urls = Some(uc_urls);
272        self
273    }
274
275    /// 配置默认上传分片大小,单位为 MB,默认为 4 MB
276    #[inline]
277    pub fn part_size(mut self, part_size: u64) -> Self {
278        self.inner.part_size = Some(part_size);
279        self
280    }
281
282    /// 配置 UP 和 UC 服务器访问重试次数,默认为 10
283    #[inline]
284    pub fn retry(mut self, retry: usize) -> Self {
285        self.inner.retry = Some(retry);
286        self
287    }
288
289    /// 配置域名访问失败后的惩罚时长,默认为 30 分钟
290    #[inline]
291    pub fn punish_time_s(mut self, punish_duration: Duration) -> Self {
292        self.inner.punish_time_s = Some(punish_duration.as_millis().try_into().unwrap_or(u64::MAX));
293        self
294    }
295
296    /// 配置域名访问的基础超时时长,默认为 30 秒
297    #[inline]
298    pub fn base_timeout_ms(mut self, base_timeout_duration: Duration) -> Self {
299        self.inner.base_timeout_ms = Some(
300            base_timeout_duration
301                .as_millis()
302                .try_into()
303                .unwrap_or(u64::MAX),
304        );
305        self
306    }
307
308    /// 配置域名访问的基础超时时长倍数百分比,service_name 指的是服务名称,percent 指的是倍数百分比,最终服务的基础超时时长为 base_timeout_ms * 该服务对应的 percent / 100
309    #[inline]
310    pub fn add_base_timeout_multiple_percent(
311        mut self,
312        service_name: ServiceName,
313        percent: u32,
314    ) -> Self {
315        if let Some(percents) = &mut self.inner.base_timeout_multiple_percents {
316            percents.insert(service_name.to_string(), percent);
317        } else {
318            let mut percents = HashMap::new();
319            percents.insert(service_name.to_string(), percent);
320            self.inner.base_timeout_multiple_percents = Some(percents);
321        }
322        self
323    }
324
325    /// 配置域名连接的超时时长,默认为 50 毫秒
326    #[inline]
327    pub fn dial_timeout_ms(mut self, dial_timeout_duration: Duration) -> Self {
328        self.inner.dial_timeout_ms = Some(
329            dial_timeout_duration
330                .as_millis()
331                .try_into()
332                .unwrap_or(u64::MAX),
333        );
334        self
335    }
336}
337
338#[inline]
339pub(super) fn build_uploader_builder_from_env() -> Option<UploaderBuilder> {
340    QINIU_CONFIG
341        .read()
342        .unwrap()
343        .as_ref()
344        .map(build_uploader_builder_from_config)
345}
346
347fn build_uploader_builder_from_config(config: &Config) -> UploaderBuilder {
348    let mut builder = UploaderBuilder::new(&config.access_key, &config.secret_key, &config.bucket);
349    if let Some(up_urls) = config.up_urls.as_ref() {
350        builder = builder.up_urls(up_urls.to_owned());
351    }
352    if let Some(uc_urls) = config.uc_urls.as_ref() {
353        builder = builder.uc_urls(uc_urls.to_owned());
354    }
355    if let Some(retry) = config.retry.as_ref() {
356        builder = builder
357            .up_tries(retry.to_owned())
358            .uc_tries(retry.to_owned());
359    }
360    if let Some(base_timeout_multiple_percents) = config.base_timeout_multiple_percents.as_ref() {
361        if let Some(&uc_timeout_multiple_percents) =
362            base_timeout_multiple_percents.get(&ServiceName::Uc.to_string())
363        {
364            builder = builder.uc_timeout_multiple(uc_timeout_multiple_percents);
365        }
366        if let Some(&up_timeout_multiple_percents) =
367            base_timeout_multiple_percents.get(&ServiceName::Up.to_string())
368        {
369            builder = builder.up_timeout_multiple(up_timeout_multiple_percents);
370        }
371    }
372
373    if let Some(punish_time_s) = config.punish_time_s.as_ref() {
374        builder = builder.punish_duration(Duration::from_secs(punish_time_s.to_owned()));
375    }
376    if let Some(base_timeout_ms) = config.base_timeout_ms.as_ref() {
377        builder = builder.base_timeout(Duration::from_millis(base_timeout_ms.to_owned()));
378    }
379    if let Some(part_size) = config.part_size.as_ref() {
380        builder = builder.part_size(part_size.to_owned() * (1 << 20));
381    }
382    builder
383}
384
385/// 服务名称
386#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
387#[non_exhaustive]
388pub enum ServiceName {
389    /// UP 服务器
390    Up,
391    /// UC 服务器
392    Uc,
393}
394
395impl fmt::Display for ServiceName {
396    #[inline]
397    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
398        match self {
399            Self::Uc => "uc".fmt(f),
400            Self::Up => "up".fmt(f),
401        }
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use std::{
409        error::Error,
410        fs::{remove_file, rename, OpenOptions},
411        io::Write,
412        sync::atomic::{AtomicUsize, Ordering::Relaxed},
413        thread::sleep,
414    };
415    use tempfile::Builder as TempFileBuilder;
416
417    #[test]
418    fn test_load_config() -> Result<(), Box<dyn Error>> {
419        env_logger::try_init().ok();
420
421        let mut config = Config {
422            access_key: "test-ak-1".into(),
423            secret_key: "test-sk-1".into(),
424            bucket: "test-bucket-1".into(),
425            up_urls: Some(vec!["http://up1.com".into(), "http://up2.com".into()]),
426            uc_urls: Default::default(),
427            retry: Default::default(),
428            punish_time_s: Default::default(),
429            base_timeout_ms: Default::default(),
430            dial_timeout_ms: Default::default(),
431            part_size: Default::default(),
432            base_timeout_multiple_percents: Default::default(),
433        };
434        let tempfile_path = {
435            let mut tempfile = TempFileBuilder::new().suffix(".toml").tempfile()?;
436            tempfile.write_all(&toml::to_vec(&config)?)?;
437            tempfile.flush()?;
438            env::set_var(QINIU_ENV, tempfile.path().as_os_str());
439            tempfile.into_temp_path()
440        };
441
442        static UPDATED: AtomicUsize = AtomicUsize::new(0);
443        UPDATED.store(0, Relaxed);
444
445        let loaded = load_config().unwrap();
446        assert_eq!(loaded, config);
447
448        on_config_updated(|| {
449            UPDATED.fetch_add(1, Relaxed);
450        });
451        on_config_updated(|| {
452            UPDATED.fetch_add(1, Relaxed);
453        });
454        on_config_updated(|| {
455            UPDATED.fetch_add(1, Relaxed);
456        });
457
458        sleep(Duration::from_secs(1));
459
460        config.access_key = "test-ak-2".into();
461        config.secret_key = "test-sk-2".into();
462        config.bucket = "test-bucket-2".into();
463
464        {
465            let mut tempfile = OpenOptions::new()
466                .write(true)
467                .truncate(true)
468                .open(&tempfile_path)?;
469            tempfile.write_all(&toml::to_vec(&config)?)?;
470            tempfile.flush()?;
471        }
472
473        sleep(Duration::from_secs(1));
474        assert_eq!(UPDATED.load(Relaxed), 3);
475
476        config.access_key = "test-ak-3".into();
477        config.secret_key = "test-sk-3".into();
478        config.bucket = "test-bucket-3".into();
479
480        {
481            remove_file(&tempfile_path)?;
482            let mut tempfile = OpenOptions::new()
483                .write(true)
484                .create(true)
485                .open(&tempfile_path)?;
486            tempfile.write_all(&toml::to_vec(&config)?)?;
487            tempfile.flush()?;
488        }
489
490        sleep(Duration::from_secs(1));
491        assert_eq!(UPDATED.load(Relaxed), 6);
492
493        {
494            let new_tempfile_path = {
495                let mut new_path = tempfile_path.to_owned().into_os_string();
496                new_path.push(".tmp");
497                new_path
498            };
499            let mut tempfile = OpenOptions::new()
500                .write(true)
501                .create(true)
502                .open(&new_tempfile_path)?;
503            tempfile.write_all(&toml::to_vec(&config)?)?;
504            tempfile.flush()?;
505            rename(&new_tempfile_path, &tempfile_path)?;
506        }
507
508        sleep(Duration::from_secs(1));
509        assert_eq!(UPDATED.load(Relaxed), 9);
510
511        {
512            let new_tempfile_path = {
513                let mut new_path = tempfile_path.to_owned().into_os_string();
514                new_path.push(".tmp");
515                new_path
516            };
517            let mut tempfile = OpenOptions::new()
518                .write(true)
519                .create(true)
520                .open(&new_tempfile_path)?;
521            tempfile.write_all(&toml::to_vec(&config)?)?;
522            tempfile.flush()?;
523            remove_file(&tempfile_path)?;
524            rename(&new_tempfile_path, &tempfile_path)?;
525        }
526
527        sleep(Duration::from_secs(1));
528        assert_eq!(UPDATED.load(Relaxed), 12);
529
530        remove_file(&tempfile_path)?;
531
532        Ok(())
533    }
534}