aliyun_tablestore_rs/
lib.rs

1use std::{collections::HashMap, fmt::Display, str::FromStr, time::Duration};
2
3use base64::{prelude::BASE64_STANDARD, Engine};
4use bytes::Bytes;
5use defined_column::{AddDefinedColumnOperation, AddDefinedColumnRequest, DeleteDefinedColumnOperation, DeleteDefinedColumnRequest};
6use error::OtsError;
7use index::{CreateIndexOperation, CreateIndexRequest, DropIndexOperation};
8use lastpoint_index::{CreateTimeseriesLastpointIndexOperation, CreateTimeseriesLastpointIndexRequest, DeleteTimeseriesLastpointIndexOperation};
9use prost::Message;
10use protos::search::{CreateSearchIndexRequest, UpdateSearchIndexRequest};
11use reqwest::{
12    header::{HeaderMap, HeaderName, HeaderValue},
13    Response,
14};
15
16use analytical_store::{
17    CreateTimeseriesAnalyticalStoreOperation, CreateTimeseriesAnalyticalStoreRequest, DeleteTimeseriesAnalyticalStoreOperation,
18    DeleteTimeseriesAnalyticalStoreRequest, DescribeTimeseriesAnalyticalStoreOperation, UpdateTimeseriesAnalyticalStoreOperation,
19    UpdateTimeseriesAnalyticalStoreRequest,
20};
21use data::{
22    BatchGetRowOperation, BatchGetRowRequest, BatchWriteRowOperation, BatchWriteRowRequest, BulkExportOperation, BulkExportRequest, BulkImportOperation,
23    BulkImportRequest, DeleteRowOperation, DeleteRowRequest, GetRangeOperation, GetRangeRequest, GetRowOperation, GetRowRequest, PutRowOperation,
24    PutRowRequest, UpdateRowOperation, UpdateRowRequest,
25};
26use search::{
27    ComputeSplitsOperation, CreateSearchIndexOperation, DeleteSearchIndexOperation, DescribeSearchIndexOperation, ListSearchIndexOperation,
28    ParallelScanOperation, ParallelScanRequest, SearchOperation, SearchRequest, UpdateSearchIndexOperation,
29};
30use sql::{SqlQueryOperation, SqlQueryRequest};
31use table::{
32    ComputeSplitPointsBySizeOperation, ComputeSplitPointsBySizeRequest, CreateTableOperation, CreateTableRequest, DeleteTableOperation, DescribeTableOperation,
33    ListTableOperation, UpdateTableOperation, UpdateTableRequest,
34};
35use timeseries_data::{
36    DeleteTimeseriesMetaOperation, DeleteTimeseriesMetaRequest, GetTimeseriesDataOperation, GetTimeseriesDataRequest, PutTimeseriesDataOperation,
37    PutTimeseriesDataRequest, QueryTimeseriesMetaOperation, QueryTimeseriesMetaRequest, ScanTimeseriesDataOperation, ScanTimeseriesDataRequest,
38    SplitTimeseriesScanTaskOperation, SplitTimeseriesScanTaskRequest, UpdateTimeseriesMetaOperation, UpdateTimeseriesMetaRequest,
39};
40use timeseries_table::{
41    CreateTimeseriesTableOperation, CreateTimeseriesTableRequest, DeleteTimeseriesTableOperation, DescribeTimeseriesTableOperation,
42    ListTimeseriesTableOperation, UpdateTimeseriesTableOperation, UpdateTimeseriesTableRequest,
43};
44use url::Url;
45use util::{get_iso8601_date_time_string, hmac_sha256};
46
47pub mod analytical_store;
48pub mod crc8;
49pub mod data;
50pub mod defined_column;
51pub mod error;
52pub mod index;
53pub mod lastpoint_index;
54pub mod macros;
55pub mod model;
56pub mod protos;
57pub mod search;
58pub mod sql;
59pub mod table;
60pub mod timeseries_data;
61pub mod timeseries_model;
62pub mod timeseries_table;
63pub mod util;
64
65#[cfg(test)]
66pub mod test_util;
67
68const USER_AGENT: &str = "aliyun-tablestore-rs/0.1.2";
69const HEADER_API_VERSION: &str = "x-ots-apiversion";
70const HEADER_ACCESS_KEY_ID: &str = "x-ots-accesskeyid";
71const HEADER_CONTENT_MD5: &str = "x-ots-contentmd5";
72const HEADER_SIGNATURE: &str = "x-ots-signature";
73const HEADER_DATE: &str = "x-ots-date";
74const HEADER_STS_TOKEN: &str = "x-ots-ststoken";
75const HEADER_SIGN_REGION: &str = "x-ots-signregion";
76const HEADER_SIGN_DATE: &str = "x-ots-signdate";
77const HEADER_INSTANCE_NAME: &str = "x-ots-instancename";
78const HEADER_SIGNATURE_V4: &str = "x-ots-signaturev4";
79
80const API_VERSION: &str = "2015-12-31";
81
82pub type OtsResult<T> = Result<T, OtsError>;
83
84#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum OtsOp {
86    #[default]
87    Undefined,
88
89    // tables
90    CreateTable,
91    UpdateTable,
92    ListTable,
93    DescribeTable,
94    DeleteTable,
95    ComputeSplitPointsBySize,
96
97    // defined columns
98    AddDefinedColumn,
99    DeleteDefinedColumn,
100
101    // Data operations
102    GetRow,
103    GetRange,
104    PutRow,
105    UpdateRow,
106    DeleteRow,
107    BatchGetRow,
108    BatchWriteRow,
109    BulkImport,
110    BulkExport,
111
112    // stream operations
113    ListStream,
114    DescribeStream,
115    GetShardIterator,
116    GetStreamRecord,
117
118    // index operations
119    CreateIndex,
120    DropIndex,
121
122    // timeseries table operations.
123    CreateTimeseriesTable,
124    ListTimeseriesTable,
125    DescribeTimeseriesTable,
126    UpdateTimeseriesTable,
127    DeleteTimeseriesTable,
128
129    // timeseries table data operations
130    PutTimeseriesData,
131    GetTimeseriesData,
132    UpdateTimeseriesMeta,
133    QueryTimeseriesMeta,
134    DeleteTimeseriesMeta,
135    SplitTimeseriesScanTask,
136    ScanTimeseriesData,
137
138    // timeseries lastpoint index
139    CreateTimeseriesLastpointIndex,
140    DeleteTimeseriesLastpointIndex,
141
142    // timeseries table analyzing operations
143    CreateTimeseriesAnalyticalStore,
144    UpdateTimeseriesAnalyticalStore,
145    DescribeTimeseriesAnalyticalStore,
146    DeleteTimeseriesAnalyticalStore,
147
148    // search index operations
149    CreateSearchIndex,
150    UpdateSearchIndex,
151    ListSearchIndex,
152    DescribeSearchIndex,
153    DeleteSearchIndex,
154    Search,
155    ComputeSplits,
156    ParallelScan,
157
158    // tunnel operations
159    CreateTunnel,
160    ListTunnel,
161    DescribeTunnel,
162    DeleteTunnel,
163
164    SQLQuery,
165}
166
167impl From<OtsOp> for String {
168    fn from(value: OtsOp) -> Self {
169        value.to_string()
170    }
171}
172
173impl Display for OtsOp {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        let s = match self {
176            OtsOp::Undefined => "_Undefined_",
177
178            OtsOp::CreateTable => "CreateTable",
179            OtsOp::UpdateTable => "UpdateTable",
180            OtsOp::ListTable => "ListTable",
181            OtsOp::DescribeTable => "DescribeTable",
182            OtsOp::DeleteTable => "DeleteTable",
183            OtsOp::ComputeSplitPointsBySize => "ComputeSplitPointsBySize",
184
185            OtsOp::AddDefinedColumn => "AddDefinedColumn",
186            OtsOp::DeleteDefinedColumn => "DeleteDefinedColumn",
187
188            OtsOp::GetRow => "GetRow",
189            OtsOp::GetRange => "GetRange",
190            OtsOp::PutRow => "PutRow",
191            OtsOp::UpdateRow => "UpdateRow",
192            OtsOp::DeleteRow => "DeleteRow",
193            OtsOp::BatchGetRow => "BatchGetRow",
194            OtsOp::BatchWriteRow => "BatchWriteRow",
195            OtsOp::BulkImport => "BulkImport",
196            OtsOp::BulkExport => "BulkExport",
197
198            OtsOp::CreateTunnel => "CreateTunnel",
199            OtsOp::ListTunnel => "ListTunnel",
200            OtsOp::DescribeTunnel => "DescribeTunnel",
201            OtsOp::DeleteTunnel => "DeleteTunnel",
202
203            OtsOp::ListStream => "ListStream",
204            OtsOp::DescribeStream => "DescribeStream",
205            OtsOp::GetShardIterator => "GetShardIterator",
206            OtsOp::GetStreamRecord => "GetStreamRecord",
207
208            OtsOp::CreateIndex => "CreateIndex",
209            OtsOp::DropIndex => "DropIndex",
210
211            OtsOp::CreateTimeseriesTable => "CreateTimeseriesTable",
212            OtsOp::ListTimeseriesTable => "ListTimeseriesTable",
213            OtsOp::DescribeTimeseriesTable => "DescribeTimeseriesTable",
214            OtsOp::UpdateTimeseriesTable => "UpdateTimeseriesTable",
215            OtsOp::DeleteTimeseriesTable => "DeleteTimeseriesTable",
216
217            OtsOp::PutTimeseriesData => "PutTimeseriesData",
218            OtsOp::GetTimeseriesData => "GetTimeseriesData",
219            OtsOp::UpdateTimeseriesMeta => "UpdateTimeseriesMeta",
220            OtsOp::QueryTimeseriesMeta => "QueryTimeseriesMeta",
221            OtsOp::DeleteTimeseriesMeta => "DeleteTimeseriesMeta",
222            OtsOp::SplitTimeseriesScanTask => "SplitTimeseriesScanTask",
223            OtsOp::ScanTimeseriesData => "ScanTimeseriesData",
224
225            OtsOp::CreateTimeseriesLastpointIndex => "CreateTimeseriesLastpointIndex",
226            OtsOp::DeleteTimeseriesLastpointIndex => "DeleteTimeseriesLastpointIndex",
227
228            OtsOp::CreateTimeseriesAnalyticalStore => "CreateTimeseriesAnalyticalStore",
229            OtsOp::UpdateTimeseriesAnalyticalStore => "UpdateTimeseriesAnalyticalStore",
230            OtsOp::DescribeTimeseriesAnalyticalStore => "DescribeTimeseriesAnalyticalStore",
231            OtsOp::DeleteTimeseriesAnalyticalStore => "DeleteTimeseriesAnalyticalStore",
232
233            OtsOp::CreateSearchIndex => "CreateSearchIndex",
234            OtsOp::UpdateSearchIndex => "UpdateSearchIndex",
235            OtsOp::ListSearchIndex => "ListSearchIndex",
236            OtsOp::DescribeSearchIndex => "DescribeSearchIndex",
237            OtsOp::DeleteSearchIndex => "DeleteSearchIndex",
238            OtsOp::Search => "Search",
239            OtsOp::ComputeSplits => "ComputeSplits",
240            OtsOp::ParallelScan => "ParallelScan",
241
242            OtsOp::SQLQuery => "SQLQuery",
243        };
244
245        write!(f, "{}", s)
246    }
247}
248
249impl OtsOp {
250    /// 检测一个操作是否是幂等的
251    pub fn is_idempotent(&self) -> bool {
252        matches!(
253            self,
254            Self::ListTable
255                | Self::DescribeTable
256                | Self::GetRow
257                | Self::GetRange
258                | Self::BatchGetRow
259                | Self::BulkExport
260                | Self::ListStream
261                | Self::DescribeStream
262                | Self::GetShardIterator
263                | Self::ComputeSplitPointsBySize
264                | Self::GetTimeseriesData
265                | Self::QueryTimeseriesMeta
266                | Self::ListTimeseriesTable
267                | Self::DescribeTimeseriesTable
268                | Self::ScanTimeseriesData
269                | Self::DescribeTimeseriesAnalyticalStore
270                | Self::ParallelScan
271                | Self::ComputeSplits
272                | Self::ListTunnel
273                | Self::DescribeTunnel
274                | Self::SQLQuery
275        )
276    }
277}
278
279#[derive(Debug, Default, Clone)]
280pub struct OtsRequestOptions {
281    pub timeout_ms: Option<u64>,
282}
283
284/// OTS API 请求结构体
285#[allow(dead_code)]
286#[derive(Debug, Clone)]
287pub struct OtsRequest {
288    method: reqwest::Method,
289    operation: OtsOp,
290    headers: HashMap<String, String>,
291    query: HashMap<String, String>,
292    body: Vec<u8>,
293    options: OtsRequestOptions,
294}
295
296impl Default for OtsRequest {
297    fn default() -> Self {
298        Self {
299            method: reqwest::Method::POST,
300            operation: OtsOp::Undefined,
301            headers: HashMap::new(),
302            query: HashMap::new(),
303            body: Vec::new(),
304            options: OtsRequestOptions::default(),
305        }
306    }
307}
308
309/// 重试策略。需要注意的是 `clone_box` 方法中用来重置策略的状态数据(如果有的话)
310pub trait RetryPolicy: std::fmt::Debug + Send + Sync {
311    /// 是否需要重试。参数分别表示重试次数、操作和发生的错误
312    fn should_retry(&self, retried: u32, op: OtsOp, ots_error: &OtsError) -> bool;
313
314    /// 如果需要重试,重试之前让线程等待的时间
315    fn delay_ms(&self) -> u32;
316
317    /// 需要自行实现克隆逻辑。一般来说就是需要重置一些记录参数,为下一次全新的请求做准备
318    fn clone_box(&self) -> Box<dyn RetryPolicy>;
319}
320
321impl Clone for Box<dyn RetryPolicy> {
322    fn clone(&self) -> Box<dyn RetryPolicy> {
323        self.clone_box()
324    }
325}
326
327/// 默认重试机制,做多重试 10 次(加上最开始的 1 次,总计就是发送 11 次请求)。
328/// 两次重试之间休眠 10 秒
329#[derive(Debug, Copy, Clone)]
330pub struct DefaultRetryPolicy {
331    pub max_retry_times: u32,
332}
333
334impl Default for DefaultRetryPolicy {
335    fn default() -> Self {
336        Self { max_retry_times: 10 }
337    }
338}
339
340impl DefaultRetryPolicy {
341    /// 无论是什么操作,只要是这些错误码,就重试
342    const RETRY_NO_MATTER_ACTIONS_ERR_CODES: &[&'static str] = &[
343        "OTSRowOperationConflict",
344        "OTSNotEnoughCapacityUnit",
345        "OTSTableNotReady",
346        "OTSPartitionUnavailable",
347        "OTSServerBusy",
348    ];
349
350    const ERR_OTS_QUOTA_EXHAUSTED_MSG: &str = "Too frequent table operations.";
351
352    // 仅针对幂等的操作,如果遇到这些错误码,重试
353    const RETRY_FOR_IDEMPOTENT_ACTIONS_ERR_CODES: &[&'static str] =
354        &["OTSTimeout", "OTSInternalServerError", "OTSServerUnavailable", "OTSTunnelServerUnavailable"];
355
356    fn should_retry_inner(&self, retried: u32, op: OtsOp, ots_error: &OtsError) -> bool {
357        if retried >= self.max_retry_times {
358            log::info!("max retry reached {} times for operation {} with error {}", self.max_retry_times, op, ots_error);
359            return false;
360        }
361
362        match ots_error {
363            // 网络请求错误,重试
364            OtsError::ReqwestError(_) => true,
365
366            // 5xx 的状态码 + 幂等操作,重试
367            OtsError::StatusError(code, _) => code.is_server_error() && op.is_idempotent(),
368
369            // API 错误, OTSQuotaExhausted 错误码 + 固定的错误消息,重试
370            OtsError::ApiError(api_error)
371                if api_error.code == "OTSQuotaExhausted" && api_error.message == Some(Self::ERR_OTS_QUOTA_EXHAUSTED_MSG.to_string()) =>
372            {
373                true
374            }
375
376            // 其他的就是无论什么操作都重试的错误,以及幂等操作对应的错误码
377            OtsError::ApiError(api_error) => {
378                (Self::RETRY_NO_MATTER_ACTIONS_ERR_CODES.contains(&api_error.code.as_str()))
379                    || (op.is_idempotent() && Self::RETRY_FOR_IDEMPOTENT_ACTIONS_ERR_CODES.contains(&api_error.code.as_str()))
380            }
381
382            _ => false,
383        }
384    }
385}
386
387impl RetryPolicy for DefaultRetryPolicy {
388    fn should_retry(&self, retried: u32, op: OtsOp, ots_error: &OtsError) -> bool {
389        self.should_retry_inner(retried, op, ots_error)
390    }
391
392    fn clone_box(&self) -> Box<dyn RetryPolicy> {
393        Box::new(DefaultRetryPolicy::default())
394    }
395
396    fn delay_ms(&self) -> u32 {
397        10000
398    }
399}
400
401/// 客户端构建器
402///
403/// # Examples
404///
405/// ```
406/// let builder = OtsClient.builder("aid", "asec").endpoint("").instance_name("");
407/// let client = builder.build();
408/// ```
409#[derive(Clone)]
410pub struct OtsClientBuilder {
411    access_key_id: String,
412    access_key_secret: String,
413    sts_token: Option<String>,
414    retry_policy: Box<dyn RetryPolicy>,
415    region: String,
416    instance_name: String,
417    endpoint: String,
418    http_client: Option<reqwest::Client>,
419}
420
421impl OtsClientBuilder {
422    pub fn new(ak_id: impl AsRef<str>, ak_sec: impl AsRef<str>) -> Self {
423        Self {
424            access_key_id: ak_id.as_ref().to_string(),
425            access_key_secret: ak_sec.as_ref().to_string(),
426            retry_policy: Box::new(DefaultRetryPolicy::default()),
427            sts_token: None,
428            region: String::new(),
429            instance_name: String::new(),
430            endpoint: String::new(),
431            http_client: None,
432        }
433    }
434
435    /// 设置 STS Token
436    pub fn sts_token(mut self, token: impl AsRef<str>) -> Self {
437        self.sts_token = Some(token.as_ref().to_string());
438
439        self
440    }
441
442    /// 设置重试策略
443    pub fn rety_policy(mut self, policy: Box<dyn RetryPolicy>) -> Self {
444        self.retry_policy = policy;
445
446        self
447    }
448
449    /// 设置地域
450    pub fn region(mut self, region: impl AsRef<str>) -> Self {
451        self.region = region.as_ref().to_string();
452
453        self
454    }
455
456    /// 设置实例名称
457    pub fn instance_name(mut self, instance_name: impl AsRef<str>) -> Self {
458        self.instance_name = instance_name.as_ref().to_string();
459
460        self
461    }
462
463    /// 设置 endpoint。例如:`https://instance-name.cn-beijing.ots.aliyuncs.com`
464    pub fn endpoint(mut self, endpoint: impl AsRef<str>) -> Self {
465        self.endpoint = endpoint.as_ref().to_string();
466
467        self
468    }
469
470    /// 设置自定义的 Client
471    pub fn http_client(mut self, client: reqwest::Client) -> Self {
472        self.http_client = Some(client);
473
474        self
475    }
476
477    /// 获取重试策略的可写引用
478    pub fn retry_policy_mut(&mut self) -> &mut Box<dyn RetryPolicy> {
479        &mut self.retry_policy
480    }
481
482    pub fn build(self) -> OtsClient {
483        let Self {
484            access_key_id,
485            access_key_secret,
486            sts_token,
487            retry_policy,
488            region,
489            instance_name,
490            endpoint,
491            http_client,
492        } = self;
493
494        OtsClient {
495            access_key_id,
496            access_key_secret,
497            sts_token,
498            region,
499            instance_name,
500            endpoint,
501            http_client: http_client.unwrap_or(reqwest::Client::new()),
502            retry_policy,
503        }
504    }
505}
506
507/// 客户端
508#[allow(dead_code)]
509#[derive(Clone)]
510pub struct OtsClient {
511    access_key_id: String,
512    access_key_secret: String,
513    sts_token: Option<String>,
514    region: String,
515    instance_name: String,
516    endpoint: String,
517    http_client: reqwest::Client,
518    retry_policy: Box<dyn RetryPolicy>,
519}
520
521impl std::fmt::Debug for OtsClient {
522    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
523        f.debug_struct("OtsClient")
524            .field("access_key_id", &self.access_key_id)
525            .field("region", &self.region)
526            .field("instance_name", &self.instance_name)
527            .field("endpoint", &self.endpoint)
528            .field("http_client", &self.http_client)
529            .finish()
530    }
531}
532
533impl OtsClient {
534    fn parse_instance_and_region(endpoint: &str) -> (&str, &str) {
535        let s = endpoint.strip_prefix("http://").unwrap_or(endpoint);
536        let s = s.strip_prefix("https://").unwrap_or(s);
537        let parts = s.split(".").collect::<Vec<_>>();
538        if parts.len() < 2 {
539            panic!("can not parse instance name and region from endpoint: {}", endpoint);
540        }
541
542        (parts[0], parts[1])
543    }
544
545    /// Build an OtsClient from env values. The following env vars are required:
546    ///
547    /// - `ALIYUN_OTS_AK_ID`: The access key id.
548    /// - `ALIYUN_OTS_AK_SEC`: The access key secret
549    /// - `ALIYUN_OTS_ENDPOINT`: The tablestore instance endpoint. e.g. `https://${instance-name}.cn-beijing.ots.aliyuncs.com`
550    pub fn from_env() -> Self {
551        let access_key_id = std::env::var("ALIYUN_OTS_AK_ID").expect("env var ALI_ACCESS_KEY_ID is missing");
552        let access_key_secret = std::env::var("ALIYUN_OTS_AK_SEC").expect("env var ALI_ACCESS_KEY_SECRET is missing");
553        let endpoint = std::env::var("ALIYUN_OTS_ENDPOINT").expect("env var ALI_OSS_ENDPOINT is missing");
554        let endpoint = endpoint.to_lowercase();
555        let (instance_name, region) = Self::parse_instance_and_region(endpoint.as_str());
556
557        Self {
558            access_key_id,
559            access_key_secret,
560            sts_token: None,
561            region: region.to_string(),
562            instance_name: instance_name.to_string(),
563            endpoint,
564            http_client: reqwest::Client::new(),
565            retry_policy: Box::new(DefaultRetryPolicy::default()),
566        }
567    }
568
569    /// 使用 AK_ID、AK_SEC 和网络访问地址构建实例
570    ///
571    /// # Arguments
572    ///
573    /// - `ak_id`: Access key id
574    /// - `ak_sec`: Access key secret
575    /// - `endpoint`: 服务地址
576    pub fn new(ak_id: impl AsRef<str>, ak_sec: impl AsRef<str>, endpoint: impl AsRef<str>) -> Self {
577        let endpoint = endpoint.as_ref().to_lowercase();
578
579        let (instance_name, region) = Self::parse_instance_and_region(endpoint.as_str());
580
581        Self {
582            access_key_id: ak_id.as_ref().to_string(),
583            access_key_secret: ak_sec.as_ref().to_string(),
584            region: region.to_string(),
585            instance_name: instance_name.to_string(),
586            endpoint,
587            http_client: reqwest::Client::new(),
588            sts_token: None,
589            retry_policy: Box::new(DefaultRetryPolicy::default()),
590        }
591    }
592
593    /// 客户端构建器
594    ///
595    /// # Arguments
596    ///
597    /// - `ak_id`: Access key id
598    /// - `ak_sec`: Access key secret
599    pub fn builder(ak_id: impl AsRef<str>, ak_sec: impl AsRef<str>) -> OtsClientBuilder {
600        OtsClientBuilder::new(ak_id, ak_sec)
601    }
602
603    /// V2 版本签名,直接填充请求头 Map
604    fn fill_signature_v2(&self, operation: &str, headers: &mut HashMap<String, String>) {
605        let date_time_string = get_iso8601_date_time_string();
606        let date = &date_time_string[..10].replace("-", "");
607
608        headers.insert("user-agent".to_string(), USER_AGENT.to_string());
609        headers.insert(HEADER_API_VERSION.to_string(), API_VERSION.to_string());
610        headers.insert(HEADER_DATE.to_string(), date_time_string.clone());
611        headers.insert(HEADER_SIGN_DATE.to_string(), date.to_string());
612        headers.insert(HEADER_ACCESS_KEY_ID.to_string(), self.access_key_id.clone());
613        headers.insert(HEADER_INSTANCE_NAME.to_string(), self.instance_name.clone());
614        headers.insert(HEADER_SIGN_REGION.to_string(), self.region.clone());
615
616        if let Some(s) = &self.sts_token {
617            headers.insert(HEADER_STS_TOKEN.to_string(), s.to_string());
618        }
619
620        let mut canonical_headers = headers
621            .iter()
622            .map(|(k, v)| (k.to_lowercase(), v))
623            .filter(|(k, _)| k.starts_with("x-ots-") && k != HEADER_SIGNATURE)
624            .map(|(k, v)| format!("{}:{}", k, v))
625            .collect::<Vec<_>>();
626        canonical_headers.sort();
627
628        let canonical_headers = canonical_headers.join("\n");
629
630        let string_to_sign = format!("/{}\nPOST\n\n{}\n", operation, canonical_headers);
631
632        log::debug!("string to sign: \n-----\n{}\n-----", string_to_sign);
633        let sig = util::hmac_sha1(self.access_key_secret.as_bytes(), string_to_sign.as_bytes());
634        let sig_string = BASE64_STANDARD.encode(&sig);
635
636        log::debug!("signature = {}", sig_string);
637
638        headers.insert(HEADER_SIGNATURE.to_string(), sig_string);
639    }
640
641    /// V4 版本签名,没调通。阿里云的人建议线用 V2 吧
642    #[allow(dead_code)]
643    fn fill_signature_v4(&self, operation: &str, headers: &mut HashMap<String, String>) {
644        let date_time_string = get_iso8601_date_time_string();
645        let date_string = &date_time_string[..10].replace("-", "");
646
647        headers.insert("user-agent".to_string(), USER_AGENT.to_string());
648        headers.insert(HEADER_API_VERSION.to_string(), API_VERSION.to_string());
649        headers.insert(HEADER_DATE.to_string(), date_time_string.clone());
650        headers.insert(HEADER_SIGN_DATE.to_string(), date_string.to_string());
651        headers.insert(HEADER_ACCESS_KEY_ID.to_string(), self.access_key_id.clone());
652        headers.insert(HEADER_INSTANCE_NAME.to_string(), self.instance_name.clone());
653        headers.insert(HEADER_SIGN_REGION.to_string(), self.region.clone());
654
655        if let Some(s) = &self.sts_token {
656            headers.insert(HEADER_STS_TOKEN.to_string(), s.to_string());
657        }
658
659        let mut canonical_headers = headers
660            .iter()
661            .map(|(k, v)| (k.to_lowercase(), v))
662            .filter(|(k, _)| k.starts_with("x-ots-") && k != HEADER_SIGNATURE_V4)
663            .map(|(k, v)| format!("{}:{}", k, v))
664            .collect::<Vec<_>>();
665        canonical_headers.sort();
666
667        let canonical_headers = canonical_headers.join("\n");
668
669        let string_to_sign = format!("/{}\nPOST\n\n{}\nots", operation, canonical_headers);
670
671        log::debug!("string to sign: \n-----\n{}\n----", string_to_sign);
672
673        let sign = hmac_sha256(self.access_key_secret.as_bytes(), string_to_sign.as_bytes());
674
675        headers.insert(HEADER_SIGNATURE_V4.to_string(), BASE64_STANDARD.encode(sign));
676    }
677
678    /// 发送请求
679    pub async fn send(&self, req: OtsRequest) -> OtsResult<Response> {
680        let OtsRequest {
681            method,
682            operation,
683            mut headers,
684            query: _,
685            body,
686            options,
687        } = req;
688
689        // 不会发生变化的请求头
690        headers.insert("content-lenght".to_string(), format!("{}", body.len()));
691        let content_md5_base64 = BASE64_STANDARD.encode(md5::compute(&body).as_slice());
692        headers.insert(HEADER_CONTENT_MD5.to_string(), content_md5_base64);
693
694        let url = Url::parse(format!("{}/{}", self.endpoint, operation).as_str()).unwrap();
695        let request_body = Bytes::from_owner(body);
696        let mut retried = 0u32;
697
698        loop {
699            self.fill_signature_v2(&operation.to_string(), &mut headers);
700
701            let mut header_map = HeaderMap::new();
702            headers.iter().for_each(|(k, v)| {
703                log::debug!(">> header: {}: {}", k, v);
704                header_map.insert(HeaderName::from_str(&k.to_lowercase()).unwrap(), HeaderValue::from_str(v).unwrap());
705            });
706
707            let mut request_builder = self
708                .http_client
709                .request(method.clone(), url.clone())
710                .headers(header_map.clone())
711                .body(request_body.clone());
712
713            // Handle per-request options
714            if let Some(ms) = options.timeout_ms {
715                request_builder = request_builder.timeout(Duration::from_millis(ms));
716            }
717
718            let response = request_builder.send().await?;
719
720            response.headers().iter().for_each(|(k, v)| {
721                log::debug!("<< header: {}: {}", k, v.to_str().unwrap());
722            });
723
724            if response.status().is_success() {
725                return Ok(response);
726            }
727
728            if !&response.status().is_success() {
729                let status = response.status();
730
731                let e = match response.bytes().await {
732                    Ok(bytes) => {
733                        let api_error = protos::Error::decode(bytes)?;
734                        OtsError::ApiError(Box::new(api_error))
735                    }
736                    Err(_) => OtsError::StatusError(status, "".to_string()),
737                };
738
739                log::error!("api call failed, check retry against retry policy for operation {} and error {}", operation, e);
740                let should_retry = self.retry_policy.should_retry(retried, operation, &e);
741                log::info!("should retry: {} for operation {} with error {}", should_retry, operation, e);
742
743                if !should_retry {
744                    return Err(e);
745                }
746
747                let next_delay = self.retry_policy.delay_ms();
748                log::info!("delay for {} ms to retry", next_delay);
749                tokio::time::sleep(tokio::time::Duration::from_millis(next_delay as u64)).await;
750
751                retried += 1;
752            }
753        }
754    }
755
756    /// 列出实例下的宽表
757    pub fn list_table(&self) -> ListTableOperation {
758        ListTableOperation::new(self.clone())
759    }
760
761    /// 创建一个宽表
762    ///
763    /// # Examples
764    ///
765    /// ```
766    /// let req = CreateTableRequest::new("users")
767    ///     .primary_key_string("user_id_part")
768    ///     .primary_key_string("user_id")
769    ///     .column_string("full_name")
770    ///     .column_string("phone_number")
771    ///     .column_string("pwd_hash")
772    ///     .column_string("badge_no")
773    ///     .column_string("gender")
774    ///     .column_integer("registered_at_ms")
775    ///     .column_bool("deleted")
776    ///     .column_integer("deleted_at_ms")
777    ///     .column_double("score")
778    ///     .column_blob("avatar")
779    ///     .index(
780    ///         IndexMetaBuilder::new("idx_phone_no")
781    ///             .primary_key("user_id_part")
782    ///             .defined_column("phone_number")
783    ///             .index_type(IndexType::ItGlobalIndex)
784    ///             .build(),
785    ///     );
786    /// let response = client.create_table(req).send().await;
787    /// ```
788    pub fn create_table(&self, request: CreateTableRequest) -> CreateTableOperation {
789        CreateTableOperation::new(self.clone(), request)
790    }
791
792    /// 更新宽表定义
793    pub fn update_table(&self, request: UpdateTableRequest) -> UpdateTableOperation {
794        UpdateTableOperation::new(self.clone(), request)
795    }
796
797    /// 获取宽表定义
798    pub fn describe_table(&self, table_name: &str) -> DescribeTableOperation {
799        DescribeTableOperation::new(self.clone(), table_name)
800    }
801
802    /// 删除宽表
803    pub fn delete_table(&self, table_name: &str) -> DeleteTableOperation {
804        DeleteTableOperation::new(self.clone(), table_name)
805    }
806
807    /// 计算宽表分裂点
808    pub fn compute_split_points_by_size(&self, request: ComputeSplitPointsBySizeRequest) -> ComputeSplitPointsBySizeOperation {
809        ComputeSplitPointsBySizeOperation::new(self.clone(), request)
810    }
811
812    /// 添加预定义列
813    ///
814    /// # Examples
815    ///
816    /// ```
817    /// let response = client
818    ///     .add_defined_column(
819    ///         AddDefinedColumnRequest::new("ccs")
820    ///             .column_integer("created_at")
821    ///             .column_string("cover_url")
822    ///             .column_double("avg_score"),
823    ///     )
824    ///     .send()
825    ///     .await;
826    /// ```
827    pub fn add_defined_column(&self, request: AddDefinedColumnRequest) -> AddDefinedColumnOperation {
828        AddDefinedColumnOperation::new(self.clone(), request)
829    }
830
831    /// 删除预定义列
832    ///
833    /// # Example
834    ///
835    /// ```
836    /// let response = client
837    ///     .delete_defined_column(DeleteDefinedColumnRequest::new("ccs").column("created_at"))
838    ///     .send()
839    ///     .await;
840    /// ```
841    pub fn delete_defined_column(&self, request: DeleteDefinedColumnRequest) -> DeleteDefinedColumnOperation {
842        DeleteDefinedColumnOperation::new(self.clone(), request)
843    }
844
845    /// 根据主键获取单行数据
846    ///
847    /// # Examples
848    ///
849    /// ```
850    /// let response = client
851    ///     .get_row(
852    ///         GetRowRequest::new("schools")
853    ///             .primary_key_string("school_id", "00020FFB-BB14-CCAD-0181-A929E71C7312")
854    ///             .primary_key_integer("id", 1742203524276000)
855    ///             .max_versions(1),
856    ///     )
857    ///     .send()
858    ///     .await;
859    /// ```
860    pub fn get_row(&self, request: GetRowRequest) -> GetRowOperation {
861        GetRowOperation::new(self.clone(), request)
862    }
863
864    /// 根据主键获取范围数据
865    ///
866    /// # Examples
867    ///
868    /// ## 依次设置开始主键和结束主键
869    ///
870    /// ```
871    /// let response = client.get_range(
872    ///     GetRangeRequest::new("table_name")
873    ///         .start_primary_key_string("id", "string_id_value")
874    ///         .start_primary_key_inf_min("long_id")
875    ///         .end_primary_key_string("id", "string_id_value")
876    ///         .end_primary_key_inf_max("long_id")
877    ///         .direction(Direction::Forward)
878    /// ).send().await;
879    /// ```
880    ///
881    /// ## 依次设置每个主键的开始和结束值
882    ///
883    /// ```
884    /// let response = client.get_range(
885    ///     GetRangeRequest::new("table_name").primary_key_range(
886    ///         "id",
887    ///         PrimaryKeyValue::String("string_id_value".to_string()),
888    ///         PrimaryKeyValue::String("string_id_value".to_string())
889    ///     ).primary_key_range(
890    ///         "long_id",
891    ///         PrimaryKeyValue::Integer(12345678),
892    ///         PrimaryKeyValue::InfMax
893    ///     ).direction(Direction::Forward)
894    /// ).send().await;
895    /// ```
896    pub fn get_range(&self, request: GetRangeRequest) -> GetRangeOperation {
897        GetRangeOperation::new(self.clone(), request)
898    }
899
900    /// 插入一行数据
901    ///
902    /// # Examples
903    ///
904    /// ```
905    /// let row = Row::default()
906    ///     .primary_key_string("school_id", &school_id)
907    ///     .primary_key_auto_increment("id")
908    ///     .column_string("name", Name(ZH_CN).fake::<String>())
909    ///     .column_string("province", Name(ZH_CN).fake::<String>());
910    ///
911    /// let response = client
912    ///     .put_row(
913    ///         PutRowRequest::new("schools").row(row).return_type(ReturnType::RtPk)
914    ///     ).send().await.unwrap();
915    /// ```
916    pub fn put_row(&self, request: PutRowRequest) -> PutRowOperation {
917        PutRowOperation::new(self.clone(), request)
918    }
919
920    /// 更新一行数据
921    ///
922    /// # Examples
923    ///
924    /// ```
925    /// let response = client
926    ///     .update_row(
927    ///         UpdateRowRequest::new(table_name)
928    ///             .row(
929    ///                 Row::new()
930    ///                     .primary_key_string("str_id", &id)
931    ///                     .column_string("str_col", "b")
932    ///                     .column_to_increse("int_col", 1)
933    ///                     .column_bool("bool_col", true)
934    ///                     .column_to_delete_all_versions("blob_col"),
935    ///             )
936    ///             .return_type(ReturnType::RtPk),
937    ///     )
938    ///     .send()
939    ///     .await;
940    /// ```
941    pub fn update_row(&self, request: UpdateRowRequest) -> UpdateRowOperation {
942        UpdateRowOperation::new(self.clone(), request)
943    }
944
945    /// 根据主键删除数据行
946    ///
947    /// # Examples
948    ///
949    /// ```
950    /// client.delete_row(
951    ///     DeleteRowRequest::new(table_name).primary_key_string("str_id", &id)
952    /// ).send().await;
953    /// ```
954    pub fn delete_row(&self, request: DeleteRowRequest) -> DeleteRowOperation {
955        DeleteRowOperation::new(self.clone(), request)
956    }
957
958    /// 批量读取一个表或多个表中的若干行数据
959    ///
960    /// # Examples
961    ///
962    /// ```
963    /// let client = OtsClient::from_env();
964    ///
965    /// let t1 = TableInBatchGetRowRequest::new("data_types")
966    ///     .primary_key(
967    ///         PrimaryKey::new().column_string("str_id", "1")
968    ///     ).primary_key(
969    ///         PrimaryKey::new().column_string("str_id", "02421870-56d8-4429-a548-27e0e1f42894")
970    ///     );
971    ///
972    /// let t2 = TableInBatchGetRowRequest::new("schools").primary_key(
973    ///     PrimaryKey::new().column_string("school_id", "00020FFB-BB14-CCAD-0181-A929E71C7312")
974    ///         .column_integer("id", 1742203524276000)
975    /// );
976    ///
977    /// let request = BatchGetRowRequest::new().tables(
978    ///     vec![t1, t2]
979    /// );
980    ///
981    /// let res = client.batch_get_row(request).send().await;
982    /// ```
983    pub fn batch_get_row(&self, request: BatchGetRowRequest) -> BatchGetRowOperation {
984        BatchGetRowOperation::new(self.clone(), request)
985    }
986
987    /// 接口批量插入、修改或删除一个或多个表中的若干行数据。
988    ///
989    /// # Examples
990    ///
991    /// ```
992    /// let client = OtsClient::from_env();
993    ///
994    /// let uuid: String = UUIDv4.fake();
995    ///
996    /// let t1 = TableInBatchWriteRowRequest::new("data_types").rows(vec![
997    ///     RowInBatchWriteRowRequest::put_row(
998    ///         Row::new()
999    ///             .primary_key_column_string("str_id", &uuid)
1000    ///             .column_string("str_col", "column is generated from batch writing"),
1001    ///     ),
1002    ///     RowInBatchWriteRowRequest::delete_row(Row::new().primary_key_column_string("str_id", "266e79aa-eb74-47d8-9658-e17d52fc012d")),
1003    ///     RowInBatchWriteRowRequest::update_row(
1004    ///         Row::new()
1005    ///             .primary_key_column_string("str_id", "975e9e17-f969-4387-9cef-a6ae9849a10d")
1006    ///             .column_double("double_col", 11.234),
1007    ///     ),
1008    /// ]);
1009    ///
1010    /// let t2 = TableInBatchWriteRowRequest::new("schools").rows(vec![RowInBatchWriteRowRequest::update_row(
1011    ///     Row::new()
1012    ///         .primary_key_column_string("school_id", "2")
1013    ///         .primary_key_column_integer("id", 1742378007415000)
1014    ///         .column_string("name", "School-AAAA"),
1015    /// )]);
1016    ///
1017    /// let req = BatchWriteRowRequest::new().table(t1).table(t2);
1018    ///
1019    /// let res = client.batch_write_row(req).send().await;
1020    /// ```
1021    pub fn batch_write_row(&self, request: BatchWriteRowRequest) -> BatchWriteRowOperation {
1022        BatchWriteRowOperation::new(self.clone(), request)
1023    }
1024
1025    /// 批量写入数据。写入数据时支持插入一行数据、修改行数据以及删除行数据。最多一次 200 行
1026    ///
1027    /// # Examples
1028    ///
1029    /// ```
1030    /// let client = OtsClient::from_env();
1031    /// let mut req = BulkImportRequest::new("data_types");
1032    /// for i in 0..200 {
1033    ///     let id: String = UUIDv4.fake();
1034    ///     let mut blob_val = [0u8; 16];
1035    ///     rand::fill(&mut blob_val);
1036    ///     let bool_val = i % 2 == 0;
1037    ///     let double_val = rand::random_range::<f64, _>(0.0f64..99.99f64);
1038    ///     let int_val = rand::random_range::<i64, _>(0..10000);
1039    ///     let str_val: String = Name(ZH_CN).fake();
1040    ///     let row = Row::new()
1041    ///         .primary_key_column_string("str_id", &id)
1042    ///         .column_blob("blob_col", blob_val)
1043    ///         .column_bool("bool_col", bool_val)
1044    ///         .column_double("double_col", double_val)
1045    ///         .column_integer("int_col", int_val)
1046    ///         .column_string("str_col", &str_val);
1047    ///     req = req.put_row(row);
1048    /// }
1049    /// let res = client.bulk_import(req).send().await;
1050    /// ```
1051    pub fn bulk_import(&self, request: BulkImportRequest) -> BulkImportOperation {
1052        BulkImportOperation::new(self.clone(), request)
1053    }
1054
1055    /// 接口批量导出数据。
1056    ///
1057    /// # Examples
1058    ///
1059    /// ```
1060    /// let request = BulkExportRequest::new("data_types")
1061    ///     .end_primary_key_column_inf_min("str_id")
1062    ///     .end_primary_key_column_inf_max("str_id")
1063    ///     .columns_to_get(["str_id", "str_col", "int_col", "double_col", "blob_col", "bool_col"]);
1064    ///
1065    /// let res = client.bulk_export(request).send().await;
1066    /// let res = res.unwrap();
1067    /// total_rows += res.rows.len();
1068    ///
1069    /// res.rows.iter().for_each(|r| {
1070    ///     log::debug!("row: {:?}", r.get_primary_key_value("str_id").unwrap());
1071    /// });
1072    /// ```
1073    pub fn bulk_export(&self, request: BulkExportRequest) -> BulkExportOperation {
1074        BulkExportOperation::new(self.clone(), request)
1075    }
1076
1077    /// 创建二级索引
1078    pub fn create_index(&self, request: CreateIndexRequest) -> CreateIndexOperation {
1079        CreateIndexOperation::new(self.clone(), request)
1080    }
1081
1082    /// 删除二级索引
1083    pub fn drop_index(&self, table_name: &str, idx_name: &str) -> DropIndexOperation {
1084        DropIndexOperation::new(self.clone(), table_name, idx_name)
1085    }
1086
1087    /// 列出多元索引
1088    pub fn list_search_index(&self, table_name: Option<&str>) -> ListSearchIndexOperation {
1089        ListSearchIndexOperation::new(self.clone(), table_name)
1090    }
1091
1092    /// 创建多元索引
1093    pub fn create_search_index(&self, request: CreateSearchIndexRequest) -> CreateSearchIndexOperation {
1094        CreateSearchIndexOperation::new(self.clone(), request)
1095    }
1096
1097    /// 查询多元索引描述信息
1098    pub fn describe_search_index(&self, table_name: &str, index_name: &str) -> DescribeSearchIndexOperation {
1099        DescribeSearchIndexOperation::new(self.clone(), table_name, index_name)
1100    }
1101
1102    /// 修改多元索引
1103    pub fn update_search_index(&self, request: UpdateSearchIndexRequest) -> UpdateSearchIndexOperation {
1104        UpdateSearchIndexOperation::new(self.clone(), request)
1105    }
1106
1107    /// 删除多元索引
1108    pub fn delete_search_index(&self, table_name: &str, index_name: &str) -> DeleteSearchIndexOperation {
1109        DeleteSearchIndexOperation::new(self.clone(), table_name, index_name)
1110    }
1111
1112    /// 通过多元索引查询数据
1113    pub fn search(&self, request: SearchRequest) -> SearchOperation {
1114        SearchOperation::new(self.clone(), request)
1115    }
1116
1117    /// 计算多元索引的并发度
1118    pub fn compute_splits(&self, table_name: &str, index_name: &str) -> ComputeSplitsOperation {
1119        ComputeSplitsOperation::new(self.clone(), table_name, index_name)
1120    }
1121
1122    /// 并行扫描
1123    pub fn parallel_scan(&self, request: ParallelScanRequest) -> ParallelScanOperation {
1124        ParallelScanOperation::new(self.clone(), request)
1125    }
1126
1127    /// 时序表 - 查询数据
1128    pub fn get_timeseries_data(&self, request: GetTimeseriesDataRequest) -> GetTimeseriesDataOperation {
1129        GetTimeseriesDataOperation::new(self.clone(), request)
1130    }
1131
1132    /// 时序表 - 写入数据
1133    ///
1134    /// # Examples
1135    ///
1136    /// ```
1137    /// let client = OtsClient::from_env();
1138    ///
1139    /// let ts_us = (current_time_ms() * 1000) as u64;
1140    ///
1141    /// let request = PutTimeseriesDataRequest::new("timeseries_demo_with_data")
1142    ///     .row(
1143    ///         TimeseriesRow::new()
1144    ///             .measurement_name("measure_11")
1145    ///             .datasource("data_11")
1146    ///             .tag("cluster", "cluster_11")
1147    ///             .tag("region", "region_11")
1148    ///             .timestamp_us(ts_us)
1149    ///             .field_integer("temp", 123),
1150    ///     )
1151    ///     .row(
1152    ///         TimeseriesRow::new()
1153    ///             .measurement_name("measure_11")
1154    ///             .datasource("data_11")
1155    ///             .tag("cluster", "cluster_11")
1156    ///             .tag("region", "region_11")
1157    ///             .timestamp_us(ts_us + 1000)
1158    ///             .field_double("temp", 543.21),
1159    ///     );
1160    ///
1161    /// let resp = client.put_timeseries_data(request).send().await;
1162    /// ```
1163    pub fn put_timeseries_data(&self, request: PutTimeseriesDataRequest) -> PutTimeseriesDataOperation {
1164        PutTimeseriesDataOperation::new(self.clone(), request)
1165    }
1166
1167    /// 时序表 - 创建时序表
1168    pub fn create_timeseries_table(&self, request: CreateTimeseriesTableRequest) -> CreateTimeseriesTableOperation {
1169        CreateTimeseriesTableOperation::new(self.clone(), request)
1170    }
1171
1172    /// 时序表 - 查询时序表信息
1173    pub fn describe_timeseries_table(&self, table_name: &str) -> DescribeTimeseriesTableOperation {
1174        DescribeTimeseriesTableOperation::new(self.clone(), table_name)
1175    }
1176
1177    /// 时序表 - 列出时序表
1178    pub fn list_timeseries_table(&self) -> ListTimeseriesTableOperation {
1179        ListTimeseriesTableOperation::new(self.clone())
1180    }
1181
1182    /// 时序表 - 更新表配置
1183    pub fn update_timeseries_table(&self, request: UpdateTimeseriesTableRequest) -> UpdateTimeseriesTableOperation {
1184        UpdateTimeseriesTableOperation::new(self.clone(), request)
1185    }
1186
1187    /// 时序表 - 删除时序表
1188    pub fn delete_timeseries_table(&self, table_name: &str) -> DeleteTimeseriesTableOperation {
1189        DeleteTimeseriesTableOperation::new(self.clone(), table_name)
1190    }
1191
1192    /// 时序表 - 创建 lastpoint 索引
1193    pub fn create_timeseries_lastpoint_index(&self, request: CreateTimeseriesLastpointIndexRequest) -> CreateTimeseriesLastpointIndexOperation {
1194        CreateTimeseriesLastpointIndexOperation::new(self.clone(), request)
1195    }
1196
1197    /// 时序表 - 删除 lastpoint 索引
1198    pub fn delete_timeseries_lastpoint_index(&self, table_name: &str, index_name: &str) -> DeleteTimeseriesLastpointIndexOperation {
1199        DeleteTimeseriesLastpointIndexOperation::new(self.clone(), table_name, index_name)
1200    }
1201
1202    /// 时序表 - 创建分析存储
1203    pub fn create_timeseries_analytical_store(&self, request: CreateTimeseriesAnalyticalStoreRequest) -> CreateTimeseriesAnalyticalStoreOperation {
1204        CreateTimeseriesAnalyticalStoreOperation::new(self.clone(), request)
1205    }
1206
1207    /// 时序表 - 更新分析存储
1208    pub fn update_timeseries_analytical_store(&self, request: UpdateTimeseriesAnalyticalStoreRequest) -> UpdateTimeseriesAnalyticalStoreOperation {
1209        UpdateTimeseriesAnalyticalStoreOperation::new(self.clone(), request)
1210    }
1211
1212    /// 时序表 - 删除分析存储
1213    pub fn delete_timeseries_analytical_store(&self, request: DeleteTimeseriesAnalyticalStoreRequest) -> DeleteTimeseriesAnalyticalStoreOperation {
1214        DeleteTimeseriesAnalyticalStoreOperation::new(self.clone(), request)
1215    }
1216
1217    /// 时序表 - 查询分析存储的信息
1218    pub fn describe_timeseries_analytical_store(&self, table_name: &str, store_name: &str) -> DescribeTimeseriesAnalyticalStoreOperation {
1219        DescribeTimeseriesAnalyticalStoreOperation::new(self.clone(), table_name, store_name)
1220    }
1221
1222    /// 时序表 - 查询元数据
1223    pub fn query_timeseries_meta(&self, request: QueryTimeseriesMetaRequest) -> QueryTimeseriesMetaOperation {
1224        QueryTimeseriesMetaOperation::new(self.clone(), request)
1225    }
1226
1227    /// 时序表 - 更新时间线元数据
1228    pub fn update_timeseries_meta(&self, request: UpdateTimeseriesMetaRequest) -> UpdateTimeseriesMetaOperation {
1229        UpdateTimeseriesMetaOperation::new(self.clone(), request)
1230    }
1231
1232    /// 时序表 - 删除时间线元数据
1233    pub fn delete_timeseries_meta(&self, request: DeleteTimeseriesMetaRequest) -> DeleteTimeseriesMetaOperation {
1234        DeleteTimeseriesMetaOperation::new(self.clone(), request)
1235    }
1236
1237    /// 时序表 - 切分全量导出任务
1238    pub fn split_timeseries_scan_task(&self, request: SplitTimeseriesScanTaskRequest) -> SplitTimeseriesScanTaskOperation {
1239        SplitTimeseriesScanTaskOperation::new(self.clone(), request)
1240    }
1241
1242    /// 时序表 - 扫描数据
1243    pub fn scan_timeseries_data(&self, request: ScanTimeseriesDataRequest) -> ScanTimeseriesDataOperation {
1244        ScanTimeseriesDataOperation::new(self.clone(), request)
1245    }
1246
1247    /// SQL 查询。注意:返回的行中,主键值和普通列的值都放到普通列 `columns` 或者 `fields` 中了。
1248    ///
1249    /// # Examples
1250    ///
1251    /// ```
1252    /// let client = OtsClient::from_env();
1253    /// let req = SqlQueryRequest::new("select * from timeseries_demo_with_data where _m_name = 'measure_11'");
1254    /// let resp = client.sql_query(req).send::<TimeseriesRow>().await;
1255    /// log::debug!("timeseries table: {:?}", resp)
1256    /// ```
1257    pub fn sql_query(&self, request: SqlQueryRequest) -> SqlQueryOperation {
1258        SqlQueryOperation::new(self.clone(), request)
1259    }
1260}