Skip to main content

ve_tos_rust_sdk/
common.rs

1/*
2 * Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16use crate::enumeration::{CannedType, GranteeType, PermissionType};
17use crate::log::DualRollingConfig;
18use chrono::{DateTime, Utc};
19use once_cell::sync::OnceCell;
20use serde::{Deserialize, Deserializer, Serialize};
21use serde_json::Value;
22use std::collections::HashMap;
23use std::ops::Sub;
24use std::path::Path;
25use std::sync::mpsc::Sender;
26use std::sync::Mutex;
27use std::time::Duration;
28use tracing_appender::non_blocking::WorkerGuard;
29use tracing_subscriber::layer::SubscriberExt;
30use tracing_subscriber::util::SubscriberInitExt;
31use tracing_subscriber::{EnvFilter, Layer, Registry};
32
33static COMMON_LOG_TARGET: OnceCell<String> = OnceCell::new();
34
35pub(crate) fn get_common_log_target() -> &'static str {
36    if let Some(target) = COMMON_LOG_TARGET.get() {
37        return target.as_str();
38    }
39    "common"
40}
41
42pub fn init_tracing_log(directives: impl AsRef<str>, directory: impl AsRef<Path>,
43                        file_name_prefix: impl AsRef<Path>) -> WorkerGuard {
44    let file_appender = tracing_appender::rolling::daily(directory, file_name_prefix);
45    let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
46
47    let directives = directives.as_ref();
48    if !directives.contains("=") {
49        COMMON_LOG_TARGET.get_or_init(move || {
50            String::from("common")
51        });
52        tracing_subscriber::fmt()
53            .with_line_number(true)
54            .with_target(true)
55            .with_thread_ids(true)
56            .with_writer(non_blocking)
57            .with_env_filter(EnvFilter::new(format!("common={}", directives)))
58            .with_ansi(false).init();
59    } else {
60        directives.splitn(2, '=').take(1).for_each(|x| {
61            COMMON_LOG_TARGET.get_or_init(move || {
62                x.to_string()
63            });
64        });
65        tracing_subscriber::fmt()
66            .with_line_number(true)
67            .with_target(true)
68            .with_thread_ids(true)
69            .with_writer(non_blocking)
70            .with_env_filter(EnvFilter::new(directives))
71            .with_ansi(false).init();
72    }
73    guard
74}
75
76pub fn init_tracing_logs(common_log: impl Into<(String, String)>, other_logs: impl Iterator<Item=(String, String)>, directory: impl AsRef<Path>,
77                         max_file_size: usize, rotate_daily: bool, max_file_number: usize) -> Vec<WorkerGuard> {
78    let mut log_pairs = Vec::with_capacity(8);
79    let (mut name, mut level) = common_log.into();
80    if name.is_empty() {
81        name = String::from("common");
82    }
83
84    let common_log_target = name.clone();
85    COMMON_LOG_TARGET.get_or_init(move || {
86        common_log_target
87    });
88
89    if level.is_empty() {
90        level = String::from("warn");
91    }
92
93    log_pairs.push((name, level));
94    for (name, level) in other_logs {
95        if name.is_empty() || level.is_empty() {
96            continue;
97        }
98        log_pairs.push((name, level));
99    }
100
101    let mut guards = Vec::with_capacity(log_pairs.len());
102    let mut layers: Vec<Box<dyn Layer<Registry> + Send + Sync>> = Vec::with_capacity(log_pairs.len());
103    for (name, level) in log_pairs {
104        let writer = DualRollingConfig::new(directory.as_ref(), name.as_str())
105            .max_file_size(max_file_size)
106            .rotate_daily(rotate_daily)
107            .max_file_number(max_file_number)
108            .build().unwrap();
109
110        let (non_blocking, guard) = tracing_appender::non_blocking(writer);
111        // 依赖 impl<S, L, F> Layer<S> for Filtered<L, F, S>
112        let layer = tracing_subscriber::fmt::layer()
113            .with_target(true)
114            .with_ansi(false)
115            .with_file(true)
116            .with_line_number(true)
117            .with_writer(non_blocking)
118            .with_filter(EnvFilter::new(format!("{}={}", name, level)));
119
120        layers.push(Box::new(layer));
121        guards.push(guard);
122    }
123
124    tracing_subscriber::registry()
125        .with(layers)
126        .init();
127    guards
128}
129
130#[derive(Debug, Clone, PartialEq, Default)]
131pub struct GenericInput {
132    pub(crate) request_date: Option<DateTime<Utc>>,
133    pub(crate) request_host: String,
134    pub(crate) request_header: Option<HashMap<String, String>>,
135    pub(crate) request_query: Option<HashMap<String, String>>,
136}
137
138pub trait GenericInputTrait {
139    fn request_date(&self) -> Option<DateTime<Utc>>;
140
141    fn request_host(&self) -> &str;
142
143    fn request_header(&self) -> &Option<HashMap<String, String>>;
144    fn request_query(&self) -> &Option<HashMap<String, String>>;
145}
146
147pub trait RequestInfoTrait {
148    fn request_id(&self) -> &str;
149
150    fn id2(&self) -> &str;
151
152    fn status_code(&self) -> isize;
153    fn header(&self) -> &HashMap<String, String>;
154}
155
156#[derive(Debug, Clone, PartialEq, Default)]
157pub struct RequestInfo {
158    pub(crate) request_id: String,
159    pub(crate) id2: String,
160    pub(crate) status_code: isize,
161    pub(crate) header: HashMap<String, String>,
162}
163
164impl RequestInfo {
165    pub fn request_id(&self) -> &str {
166        &self.request_id
167    }
168
169    pub fn id2(&self) -> &str {
170        &self.id2
171    }
172
173    pub fn status_code(&self) -> isize {
174        self.status_code
175    }
176
177    pub fn header(&self) -> &HashMap<String, String> {
178        &self.header
179    }
180}
181
182impl RequestInfoTrait for RequestInfo {
183    fn request_id(&self) -> &str {
184        &self.request_id
185    }
186
187    fn id2(&self) -> &str {
188        &self.id2
189    }
190
191    fn status_code(&self) -> isize {
192        self.status_code
193    }
194
195    fn header(&self) -> &HashMap<String, String> {
196        &self.header
197    }
198}
199
200pub type Meta = HashMap<String, String>;
201
202#[derive(Debug, Clone, PartialEq, Default, Deserialize, Serialize)]
203pub struct Owner {
204    #[serde(default)]
205    #[serde(rename = "ID")]
206    pub(crate) id: String,
207}
208
209impl Owner {
210    pub fn new(id: impl Into<String>) -> Self {
211        Self {
212            id: id.into(),
213        }
214    }
215    pub fn id(&self) -> &str {
216        &self.id
217    }
218    pub fn set_id(&mut self, id: impl Into<String>) {
219        self.id = id.into();
220    }
221}
222
223#[derive(Debug, Clone, PartialEq, Default, Deserialize, Serialize)]
224pub struct Grant {
225    #[serde(default)]
226    #[serde(rename = "Grantee")]
227    pub(crate) grantee: Grantee,
228    #[serde(default)]
229    #[serde(rename = "Permission")]
230    pub(crate) permission: PermissionType,
231}
232
233impl Grant {
234    pub fn new(grantee: impl Into<Grantee>, permission: impl Into<PermissionType>) -> Self {
235        Self {
236            grantee: grantee.into(),
237            permission: permission.into(),
238        }
239    }
240    pub fn grantee(&self) -> &Grantee {
241        &self.grantee
242    }
243    pub fn permission(&self) -> &PermissionType {
244        &self.permission
245    }
246    pub fn set_grantee(&mut self, grantee: impl Into<Grantee>) {
247        self.grantee = grantee.into();
248    }
249    pub fn set_permission(&mut self, permission: impl Into<PermissionType>) {
250        self.permission = permission.into();
251    }
252}
253
254#[derive(Debug, Clone, PartialEq, Default, Deserialize, Serialize)]
255pub struct Grantee {
256    #[serde(default)]
257    #[serde(rename = "ID")]
258    #[serde(skip_serializing_if = "String::is_empty")]
259    pub(crate) id: String,
260    #[serde(default)]
261    #[serde(rename = "Type")]
262    pub(crate) grantee_type: GranteeType,
263    #[serde(default)]
264    #[serde(rename = "Canned")]
265    #[serde(skip_serializing_if = "Option::is_none")]
266    pub(crate) canned: Option<CannedType>,
267}
268
269impl Grantee {
270    pub fn new(grantee_type: impl Into<GranteeType>) -> Self {
271        Self {
272            id: "".to_string(),
273            grantee_type: grantee_type.into(),
274            canned: None,
275        }
276    }
277    pub fn new_with_id(grantee_type: impl Into<GranteeType>, id: impl Into<String>) -> Self {
278        Self {
279            id: id.into(),
280            grantee_type: grantee_type.into(),
281            canned: None,
282        }
283    }
284    pub fn new_with_canned(grantee_type: impl Into<GranteeType>, canned: impl Into<CannedType>) -> Self {
285        Self {
286            id: "".to_string(),
287            grantee_type: grantee_type.into(),
288            canned: Some(canned.into()),
289        }
290    }
291    pub fn id(&self) -> &str {
292        &self.id
293    }
294    pub fn grantee_type(&self) -> &GranteeType {
295        &self.grantee_type
296    }
297    pub fn canned(&self) -> &Option<CannedType> {
298        &self.canned
299    }
300    pub fn set_id(&mut self, id: impl Into<String>) {
301        self.id = id.into();
302    }
303    pub fn set_grantee_type(&mut self, grantee_type: impl Into<GranteeType>) {
304        self.grantee_type = grantee_type.into();
305    }
306    pub fn set_canned(&mut self, canned: impl Into<CannedType>) {
307        self.canned = Some(canned.into());
308    }
309}
310
311#[derive(Debug, Clone, PartialEq, Default, Deserialize)]
312pub struct ListedCommonPrefix {
313    #[serde(default)]
314    #[serde(rename = "Prefix")]
315    pub(crate) prefix: String,
316    #[serde(default)]
317    #[serde(rename = "LastModified")]
318    pub(crate) last_modified_string: Option<String>,
319    #[serde(skip)]
320    pub(crate) last_modified: Option<DateTime<Utc>>,
321}
322
323impl ListedCommonPrefix {
324    pub fn prefix(&self) -> &str {
325        &self.prefix
326    }
327
328    pub fn last_modified(&self) -> Option<DateTime<Utc>> {
329        self.last_modified
330    }
331}
332
333#[derive(Debug, Clone, PartialEq, Default, Serialize)]
334pub struct TagSet {
335    #[serde(rename = "Tags")]
336    #[serde(skip_serializing_if = "Vec::is_empty")]
337    pub(crate) tags: Vec<Tag>,
338}
339
340impl<'de> Deserialize<'de> for TagSet {
341    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
342    where
343        D: Deserializer<'de>,
344    {
345        match Option::<Value>::deserialize(deserializer)? {
346            None => Ok(Self::default()),
347            Some(value) => {
348                if value.is_object() {
349                    if let Some(ts) = value.get("Tags") {
350                        if ts.is_array() {
351                            if let Some(ts) = ts.as_array() {
352                                let mut tags = Vec::with_capacity(ts.len());
353                                for tag in ts {
354                                    let key = tag.get("Key").unwrap().as_str().unwrap().to_string();
355                                    let value = tag.get("Value").unwrap().as_str().unwrap().to_string();
356                                    tags.push(Tag {
357                                        key,
358                                        value,
359                                    });
360                                }
361                                return Ok(Self {
362                                    tags,
363                                });
364                            }
365                        }
366                    }
367                }
368
369                Ok(Self {
370                    tags: vec![],
371                })
372            }
373        }
374    }
375}
376
377impl TagSet {
378    pub fn new(tags: impl Into<Vec<Tag>>) -> Self {
379        Self {
380            tags: tags.into(),
381        }
382    }
383
384    pub fn tags(&self) -> &Vec<Tag> {
385        &self.tags
386    }
387
388    pub fn set_tags(&mut self, tags: impl Into<Vec<Tag>>) {
389        self.tags = tags.into();
390    }
391}
392
393
394#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
395pub struct Tag {
396    #[serde(rename = "Key")]
397    pub(crate) key: String,
398    #[serde(rename = "Value")]
399    pub(crate) value: String,
400}
401
402impl Tag {
403    pub fn new(key: impl Into<String>, value: impl Into<String>) -> Self {
404        Self {
405            key: key.into(),
406            value: value.into(),
407        }
408    }
409
410    pub fn key(&self) -> &str {
411        &self.key
412    }
413
414    pub fn value(&self) -> &str {
415        &self.value
416    }
417
418    pub fn set_key(&mut self, key: impl Into<String>) {
419        self.key = key.into();
420    }
421
422    pub fn set_value(&mut self, value: impl Into<String>) {
423        self.value = value.into();
424    }
425}
426
427#[derive(Debug, Clone, PartialEq, Default, Deserialize)]
428pub(crate) struct TempCopyResult {
429    #[serde(default)]
430    #[serde(rename = "ETag")]
431    pub(crate) etag: String,
432    #[serde(default)]
433    #[serde(rename = "LastModified")]
434    pub(crate) last_modified: String,
435    #[serde(default)]
436    #[serde(rename = "Code")]
437    pub(crate) code: String,
438    #[serde(default)]
439    #[serde(rename = "Message")]
440    pub(crate) message: String,
441    #[serde(default)]
442    #[serde(rename = "HostId")]
443    pub(crate) host_id: String,
444    #[serde(default)]
445    #[serde(rename = "Resource")]
446    pub(crate) resource: String,
447    #[serde(default)]
448    #[serde(rename = "EC")]
449    pub(crate) ec: String,
450    #[serde(default)]
451    #[serde(rename = "Key")]
452    pub(crate) key: String,
453}
454
455#[derive(Debug, Clone, PartialEq, Default, Deserialize)]
456pub(crate) struct TempFetchResult {
457    #[serde(default)]
458    #[serde(rename = "ETag")]
459    pub(crate) etag: String,
460    #[serde(default)]
461    #[serde(rename = "SourceContentType")]
462    pub(crate) source_content_type: String,
463    #[serde(default)]
464    #[serde(rename = "SourceContentLength")]
465    pub(crate) source_content_length: i64,
466    #[serde(default)]
467    #[serde(rename = "MD5")]
468    pub(crate) md5: String,
469    #[serde(default)]
470    #[serde(rename = "Code")]
471    pub(crate) code: String,
472    #[serde(default)]
473    #[serde(rename = "Message")]
474    pub(crate) message: String,
475    #[serde(default)]
476    #[serde(rename = "HostId")]
477    pub(crate) host_id: String,
478    #[serde(default)]
479    #[serde(rename = "Resource")]
480    pub(crate) resource: String,
481    #[serde(default)]
482    #[serde(rename = "EC")]
483    pub(crate) ec: String,
484    #[serde(default)]
485    #[serde(rename = "Key")]
486    pub(crate) key: String,
487}
488#[derive(Debug, Clone, PartialEq, Default, Deserialize)]
489pub(crate) struct UserMeta {
490    #[serde(default)]
491    #[serde(rename = "Key")]
492    pub(crate) key: String,
493    #[serde(default)]
494    #[serde(rename = "Value")]
495    pub(crate) value: String,
496}
497
498#[derive(Debug)]
499pub struct RateLimiter {
500    pub(crate) capacity: i64,
501    pub(crate) rate: i64,
502    pub(crate) tokens_checkpoint: Mutex<(i64, DateTime<Utc>)>,
503}
504
505impl PartialEq for RateLimiter {
506    fn eq(&self, other: &Self) -> bool {
507        self.capacity == other.capacity && self.rate == other.rate
508    }
509}
510
511impl RateLimiter {
512    pub fn new(mut capacity: i64, rate: i64) -> Self {
513        if capacity < rate {
514            capacity = rate;
515        }
516        Self {
517            capacity,
518            rate,
519            tokens_checkpoint: Mutex::new((0, Utc::now())),
520        }
521    }
522
523    pub fn acquire(&self, want: i64) -> (bool, Option<Duration>) {
524        if self.capacity <= 0 || want <= 0 {
525            return (true, None);
526        }
527        let mut tokens_checkpoint = self.tokens_checkpoint.lock().unwrap();
528        let now = Utc::now();
529        let delta = now.sub(tokens_checkpoint.1).num_milliseconds() * self.rate / 1000 + 1;
530        let mut tokens = tokens_checkpoint.0;
531        if tokens + delta >= self.capacity {
532            tokens = self.capacity;
533        } else {
534            tokens += delta;
535        }
536
537        if tokens >= want {
538            tokens_checkpoint.0 = tokens - want;
539            tokens_checkpoint.1 = now;
540            return (true, None);
541        }
542
543        tokens_checkpoint.0 = tokens;
544        tokens_checkpoint.1 = now;
545        let mut result = (want - tokens) * 1000 / self.rate + 1;
546        if result < 10 {
547            result = 10;
548        }
549
550        (false, Some(Duration::from_millis(result as u64)))
551    }
552}
553#[derive(Debug, Clone, PartialEq)]
554pub enum DataTransferType {
555    DataTransferStarted,
556    DataTransferRW,
557    DataTransferSucceed,
558    DataTransferFailed,
559}
560
561#[derive(Debug, Clone, PartialEq)]
562pub struct DataTransferStatus {
563    pub(crate) operation: String,
564    pub(crate) bucket: String,
565    pub(crate) key: String,
566    pub(crate) consumed_bytes: i64,
567    pub(crate) total_bytes: i64,
568    pub(crate) rw_once_bytes: i64,
569    pub(crate) data_transfer_type: DataTransferType,
570    pub(crate) retry_count: isize,
571}
572
573impl DataTransferStatus {
574    pub(crate) fn new(data_transfer_type: DataTransferType, retry_count: isize) -> Self {
575        Self {
576            operation: "".to_string(),
577            bucket: "".to_string(),
578            key: "".to_string(),
579            consumed_bytes: -1,
580            total_bytes: -1,
581            rw_once_bytes: -1,
582            data_transfer_type,
583            retry_count,
584        }
585    }
586    pub(crate) fn set_operation(mut self, operation: impl Into<String>) -> Self {
587        self.operation = operation.into();
588        self
589    }
590    pub(crate) fn set_bucket(mut self, bucket: impl Into<String>) -> Self {
591        self.bucket = bucket.into();
592        self
593    }
594    pub(crate) fn set_key(mut self, key: impl Into<String>) -> Self {
595        self.key = key.into();
596        self
597    }
598    pub(crate) fn set_consumed_bytes(mut self, consumed_bytes: i64) -> Self {
599        self.consumed_bytes = consumed_bytes;
600        self
601    }
602    pub(crate) fn set_total_bytes(mut self, total_bytes: i64) -> Self {
603        self.total_bytes = total_bytes;
604        self
605    }
606    pub(crate) fn set_rw_once_bytes(mut self, rw_once_bytes: i64) -> Self {
607        self.rw_once_bytes = rw_once_bytes;
608        self
609    }
610    pub fn operation(&self) -> &str {
611        &self.operation
612    }
613    pub fn bucket(&self) -> &str {
614        &self.bucket
615    }
616
617    pub fn key(&self) -> &str {
618        &self.key
619    }
620
621    pub fn consumed_bytes(&self) -> i64 {
622        self.consumed_bytes
623    }
624
625    pub fn total_bytes(&self) -> i64 {
626        self.total_bytes
627    }
628
629    pub fn rw_once_bytes(&self) -> i64 {
630        self.rw_once_bytes
631    }
632
633    pub fn data_transfer_type(&self) -> &DataTransferType {
634        &self.data_transfer_type
635    }
636
637    pub fn retry_count(&self) -> isize {
638        self.retry_count
639    }
640}
641
642pub trait DataTransferListener {
643    fn data_transfer_listener(&self) -> &Option<Sender<DataTransferStatus>>;
644    fn set_data_transfer_listener(&mut self, listener: impl Into<Sender<DataTransferStatus>>);
645}
646