1use std::{collections::HashMap, fmt::Display, str::FromStr, time::Duration};
2
3use base64::{prelude::BASE64_STANDARD, Engine};
4use bytes::Bytes;
5use defined_column::{AddDefinedColumnOperation, AddDefinedColumnRequest, DeleteDefinedColumnOperation, DeleteDefinedColumnRequest};
6use error::OtsError;
7use index::{CreateIndexOperation, CreateIndexRequest, DropIndexOperation};
8use lastpoint_index::{CreateTimeseriesLastpointIndexOperation, CreateTimeseriesLastpointIndexRequest, DeleteTimeseriesLastpointIndexOperation};
9use prost::Message;
10use protos::search::{CreateSearchIndexRequest, UpdateSearchIndexRequest};
11use reqwest::{
12 header::{HeaderMap, HeaderName, HeaderValue},
13 Response,
14};
15
16use analytical_store::{
17 CreateTimeseriesAnalyticalStoreOperation, CreateTimeseriesAnalyticalStoreRequest, DeleteTimeseriesAnalyticalStoreOperation,
18 DeleteTimeseriesAnalyticalStoreRequest, DescribeTimeseriesAnalyticalStoreOperation, UpdateTimeseriesAnalyticalStoreOperation,
19 UpdateTimeseriesAnalyticalStoreRequest,
20};
21use data::{
22 BatchGetRowOperation, BatchGetRowRequest, BatchWriteRowOperation, BatchWriteRowRequest, BulkExportOperation, BulkExportRequest, BulkImportOperation,
23 BulkImportRequest, DeleteRowOperation, DeleteRowRequest, GetRangeOperation, GetRangeRequest, GetRowOperation, GetRowRequest, PutRowOperation,
24 PutRowRequest, UpdateRowOperation, UpdateRowRequest,
25};
26use search::{
27 ComputeSplitsOperation, CreateSearchIndexOperation, DeleteSearchIndexOperation, DescribeSearchIndexOperation, ListSearchIndexOperation,
28 ParallelScanOperation, ParallelScanRequest, SearchOperation, SearchRequest, UpdateSearchIndexOperation,
29};
30use sql::{SqlQueryOperation, SqlQueryRequest};
31use table::{
32 ComputeSplitPointsBySizeOperation, ComputeSplitPointsBySizeRequest, CreateTableOperation, CreateTableRequest, DeleteTableOperation, DescribeTableOperation,
33 ListTableOperation, UpdateTableOperation, UpdateTableRequest,
34};
35use timeseries_data::{
36 DeleteTimeseriesMetaOperation, DeleteTimeseriesMetaRequest, GetTimeseriesDataOperation, GetTimeseriesDataRequest, PutTimeseriesDataOperation,
37 PutTimeseriesDataRequest, QueryTimeseriesMetaOperation, QueryTimeseriesMetaRequest, ScanTimeseriesDataOperation, ScanTimeseriesDataRequest,
38 SplitTimeseriesScanTaskOperation, SplitTimeseriesScanTaskRequest, UpdateTimeseriesMetaOperation, UpdateTimeseriesMetaRequest,
39};
40use timeseries_table::{
41 CreateTimeseriesTableOperation, CreateTimeseriesTableRequest, DeleteTimeseriesTableOperation, DescribeTimeseriesTableOperation,
42 ListTimeseriesTableOperation, UpdateTimeseriesTableOperation, UpdateTimeseriesTableRequest,
43};
44use url::Url;
45use util::{get_iso8601_date_time_string, hmac_sha256};
46
47pub mod analytical_store;
48pub mod crc8;
49pub mod data;
50pub mod defined_column;
51pub mod error;
52pub mod index;
53pub mod lastpoint_index;
54pub mod macros;
55pub mod model;
56pub mod protos;
57pub mod search;
58pub mod sql;
59pub mod table;
60pub mod timeseries_data;
61pub mod timeseries_model;
62pub mod timeseries_table;
63pub mod util;
64
65#[cfg(test)]
66pub mod test_util;
67
68const USER_AGENT: &str = "aliyun-tablestore-rs/0.1.2";
69const HEADER_API_VERSION: &str = "x-ots-apiversion";
70const HEADER_ACCESS_KEY_ID: &str = "x-ots-accesskeyid";
71const HEADER_CONTENT_MD5: &str = "x-ots-contentmd5";
72const HEADER_SIGNATURE: &str = "x-ots-signature";
73const HEADER_DATE: &str = "x-ots-date";
74const HEADER_STS_TOKEN: &str = "x-ots-ststoken";
75const HEADER_SIGN_REGION: &str = "x-ots-signregion";
76const HEADER_SIGN_DATE: &str = "x-ots-signdate";
77const HEADER_INSTANCE_NAME: &str = "x-ots-instancename";
78const HEADER_SIGNATURE_V4: &str = "x-ots-signaturev4";
79
80const API_VERSION: &str = "2015-12-31";
81
82pub type OtsResult<T> = Result<T, OtsError>;
83
84#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum OtsOp {
86 #[default]
87 Undefined,
88
89 CreateTable,
91 UpdateTable,
92 ListTable,
93 DescribeTable,
94 DeleteTable,
95 ComputeSplitPointsBySize,
96
97 AddDefinedColumn,
99 DeleteDefinedColumn,
100
101 GetRow,
103 GetRange,
104 PutRow,
105 UpdateRow,
106 DeleteRow,
107 BatchGetRow,
108 BatchWriteRow,
109 BulkImport,
110 BulkExport,
111
112 ListStream,
114 DescribeStream,
115 GetShardIterator,
116 GetStreamRecord,
117
118 CreateIndex,
120 DropIndex,
121
122 CreateTimeseriesTable,
124 ListTimeseriesTable,
125 DescribeTimeseriesTable,
126 UpdateTimeseriesTable,
127 DeleteTimeseriesTable,
128
129 PutTimeseriesData,
131 GetTimeseriesData,
132 UpdateTimeseriesMeta,
133 QueryTimeseriesMeta,
134 DeleteTimeseriesMeta,
135 SplitTimeseriesScanTask,
136 ScanTimeseriesData,
137
138 CreateTimeseriesLastpointIndex,
140 DeleteTimeseriesLastpointIndex,
141
142 CreateTimeseriesAnalyticalStore,
144 UpdateTimeseriesAnalyticalStore,
145 DescribeTimeseriesAnalyticalStore,
146 DeleteTimeseriesAnalyticalStore,
147
148 CreateSearchIndex,
150 UpdateSearchIndex,
151 ListSearchIndex,
152 DescribeSearchIndex,
153 DeleteSearchIndex,
154 Search,
155 ComputeSplits,
156 ParallelScan,
157
158 CreateTunnel,
160 ListTunnel,
161 DescribeTunnel,
162 DeleteTunnel,
163
164 SQLQuery,
165}
166
167impl From<OtsOp> for String {
168 fn from(value: OtsOp) -> Self {
169 value.to_string()
170 }
171}
172
173impl Display for OtsOp {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 let s = match self {
176 OtsOp::Undefined => "_Undefined_",
177
178 OtsOp::CreateTable => "CreateTable",
179 OtsOp::UpdateTable => "UpdateTable",
180 OtsOp::ListTable => "ListTable",
181 OtsOp::DescribeTable => "DescribeTable",
182 OtsOp::DeleteTable => "DeleteTable",
183 OtsOp::ComputeSplitPointsBySize => "ComputeSplitPointsBySize",
184
185 OtsOp::AddDefinedColumn => "AddDefinedColumn",
186 OtsOp::DeleteDefinedColumn => "DeleteDefinedColumn",
187
188 OtsOp::GetRow => "GetRow",
189 OtsOp::GetRange => "GetRange",
190 OtsOp::PutRow => "PutRow",
191 OtsOp::UpdateRow => "UpdateRow",
192 OtsOp::DeleteRow => "DeleteRow",
193 OtsOp::BatchGetRow => "BatchGetRow",
194 OtsOp::BatchWriteRow => "BatchWriteRow",
195 OtsOp::BulkImport => "BulkImport",
196 OtsOp::BulkExport => "BulkExport",
197
198 OtsOp::CreateTunnel => "CreateTunnel",
199 OtsOp::ListTunnel => "ListTunnel",
200 OtsOp::DescribeTunnel => "DescribeTunnel",
201 OtsOp::DeleteTunnel => "DeleteTunnel",
202
203 OtsOp::ListStream => "ListStream",
204 OtsOp::DescribeStream => "DescribeStream",
205 OtsOp::GetShardIterator => "GetShardIterator",
206 OtsOp::GetStreamRecord => "GetStreamRecord",
207
208 OtsOp::CreateIndex => "CreateIndex",
209 OtsOp::DropIndex => "DropIndex",
210
211 OtsOp::CreateTimeseriesTable => "CreateTimeseriesTable",
212 OtsOp::ListTimeseriesTable => "ListTimeseriesTable",
213 OtsOp::DescribeTimeseriesTable => "DescribeTimeseriesTable",
214 OtsOp::UpdateTimeseriesTable => "UpdateTimeseriesTable",
215 OtsOp::DeleteTimeseriesTable => "DeleteTimeseriesTable",
216
217 OtsOp::PutTimeseriesData => "PutTimeseriesData",
218 OtsOp::GetTimeseriesData => "GetTimeseriesData",
219 OtsOp::UpdateTimeseriesMeta => "UpdateTimeseriesMeta",
220 OtsOp::QueryTimeseriesMeta => "QueryTimeseriesMeta",
221 OtsOp::DeleteTimeseriesMeta => "DeleteTimeseriesMeta",
222 OtsOp::SplitTimeseriesScanTask => "SplitTimeseriesScanTask",
223 OtsOp::ScanTimeseriesData => "ScanTimeseriesData",
224
225 OtsOp::CreateTimeseriesLastpointIndex => "CreateTimeseriesLastpointIndex",
226 OtsOp::DeleteTimeseriesLastpointIndex => "DeleteTimeseriesLastpointIndex",
227
228 OtsOp::CreateTimeseriesAnalyticalStore => "CreateTimeseriesAnalyticalStore",
229 OtsOp::UpdateTimeseriesAnalyticalStore => "UpdateTimeseriesAnalyticalStore",
230 OtsOp::DescribeTimeseriesAnalyticalStore => "DescribeTimeseriesAnalyticalStore",
231 OtsOp::DeleteTimeseriesAnalyticalStore => "DeleteTimeseriesAnalyticalStore",
232
233 OtsOp::CreateSearchIndex => "CreateSearchIndex",
234 OtsOp::UpdateSearchIndex => "UpdateSearchIndex",
235 OtsOp::ListSearchIndex => "ListSearchIndex",
236 OtsOp::DescribeSearchIndex => "DescribeSearchIndex",
237 OtsOp::DeleteSearchIndex => "DeleteSearchIndex",
238 OtsOp::Search => "Search",
239 OtsOp::ComputeSplits => "ComputeSplits",
240 OtsOp::ParallelScan => "ParallelScan",
241
242 OtsOp::SQLQuery => "SQLQuery",
243 };
244
245 write!(f, "{}", s)
246 }
247}
248
249impl OtsOp {
250 pub fn is_idempotent(&self) -> bool {
252 matches!(
253 self,
254 Self::ListTable
255 | Self::DescribeTable
256 | Self::GetRow
257 | Self::GetRange
258 | Self::BatchGetRow
259 | Self::BulkExport
260 | Self::ListStream
261 | Self::DescribeStream
262 | Self::GetShardIterator
263 | Self::ComputeSplitPointsBySize
264 | Self::GetTimeseriesData
265 | Self::QueryTimeseriesMeta
266 | Self::ListTimeseriesTable
267 | Self::DescribeTimeseriesTable
268 | Self::ScanTimeseriesData
269 | Self::DescribeTimeseriesAnalyticalStore
270 | Self::ParallelScan
271 | Self::ComputeSplits
272 | Self::ListTunnel
273 | Self::DescribeTunnel
274 | Self::SQLQuery
275 )
276 }
277}
278
279#[derive(Debug, Default, Clone)]
280pub struct OtsRequestOptions {
281 pub timeout_ms: Option<u64>,
282}
283
284#[allow(dead_code)]
286#[derive(Debug, Clone)]
287pub struct OtsRequest {
288 method: reqwest::Method,
289 operation: OtsOp,
290 headers: HashMap<String, String>,
291 query: HashMap<String, String>,
292 body: Vec<u8>,
293 options: OtsRequestOptions,
294}
295
296impl Default for OtsRequest {
297 fn default() -> Self {
298 Self {
299 method: reqwest::Method::POST,
300 operation: OtsOp::Undefined,
301 headers: HashMap::new(),
302 query: HashMap::new(),
303 body: Vec::new(),
304 options: OtsRequestOptions::default(),
305 }
306 }
307}
308
309pub trait RetryPolicy: std::fmt::Debug + Send + Sync {
311 fn should_retry(&self, retried: u32, op: OtsOp, ots_error: &OtsError) -> bool;
313
314 fn delay_ms(&self) -> u32;
316
317 fn clone_box(&self) -> Box<dyn RetryPolicy>;
319}
320
321impl Clone for Box<dyn RetryPolicy> {
322 fn clone(&self) -> Box<dyn RetryPolicy> {
323 self.clone_box()
324 }
325}
326
327#[derive(Debug, Copy, Clone)]
330pub struct DefaultRetryPolicy {
331 pub max_retry_times: u32,
332}
333
334impl Default for DefaultRetryPolicy {
335 fn default() -> Self {
336 Self { max_retry_times: 10 }
337 }
338}
339
340impl DefaultRetryPolicy {
341 const RETRY_NO_MATTER_ACTIONS_ERR_CODES: &[&'static str] = &[
343 "OTSRowOperationConflict",
344 "OTSNotEnoughCapacityUnit",
345 "OTSTableNotReady",
346 "OTSPartitionUnavailable",
347 "OTSServerBusy",
348 ];
349
350 const ERR_OTS_QUOTA_EXHAUSTED_MSG: &str = "Too frequent table operations.";
351
352 const RETRY_FOR_IDEMPOTENT_ACTIONS_ERR_CODES: &[&'static str] =
354 &["OTSTimeout", "OTSInternalServerError", "OTSServerUnavailable", "OTSTunnelServerUnavailable"];
355
356 fn should_retry_inner(&self, retried: u32, op: OtsOp, ots_error: &OtsError) -> bool {
357 if retried >= self.max_retry_times {
358 log::info!("max retry reached {} times for operation {} with error {}", self.max_retry_times, op, ots_error);
359 return false;
360 }
361
362 match ots_error {
363 OtsError::ReqwestError(_) => true,
365
366 OtsError::StatusError(code, _) => code.is_server_error() && op.is_idempotent(),
368
369 OtsError::ApiError(api_error)
371 if api_error.code == "OTSQuotaExhausted" && api_error.message == Some(Self::ERR_OTS_QUOTA_EXHAUSTED_MSG.to_string()) =>
372 {
373 true
374 }
375
376 OtsError::ApiError(api_error) => {
378 (Self::RETRY_NO_MATTER_ACTIONS_ERR_CODES.contains(&api_error.code.as_str()))
379 || (op.is_idempotent() && Self::RETRY_FOR_IDEMPOTENT_ACTIONS_ERR_CODES.contains(&api_error.code.as_str()))
380 }
381
382 _ => false,
383 }
384 }
385}
386
387impl RetryPolicy for DefaultRetryPolicy {
388 fn should_retry(&self, retried: u32, op: OtsOp, ots_error: &OtsError) -> bool {
389 self.should_retry_inner(retried, op, ots_error)
390 }
391
392 fn clone_box(&self) -> Box<dyn RetryPolicy> {
393 Box::new(DefaultRetryPolicy::default())
394 }
395
396 fn delay_ms(&self) -> u32 {
397 10000
398 }
399}
400
401#[derive(Clone)]
410pub struct OtsClientBuilder {
411 access_key_id: String,
412 access_key_secret: String,
413 sts_token: Option<String>,
414 retry_policy: Box<dyn RetryPolicy>,
415 region: String,
416 instance_name: String,
417 endpoint: String,
418 http_client: Option<reqwest::Client>,
419}
420
421impl OtsClientBuilder {
422 pub fn new(ak_id: impl AsRef<str>, ak_sec: impl AsRef<str>) -> Self {
423 Self {
424 access_key_id: ak_id.as_ref().to_string(),
425 access_key_secret: ak_sec.as_ref().to_string(),
426 retry_policy: Box::new(DefaultRetryPolicy::default()),
427 sts_token: None,
428 region: String::new(),
429 instance_name: String::new(),
430 endpoint: String::new(),
431 http_client: None,
432 }
433 }
434
435 pub fn sts_token(mut self, token: impl AsRef<str>) -> Self {
437 self.sts_token = Some(token.as_ref().to_string());
438
439 self
440 }
441
442 pub fn rety_policy(mut self, policy: Box<dyn RetryPolicy>) -> Self {
444 self.retry_policy = policy;
445
446 self
447 }
448
449 pub fn region(mut self, region: impl AsRef<str>) -> Self {
451 self.region = region.as_ref().to_string();
452
453 self
454 }
455
456 pub fn instance_name(mut self, instance_name: impl AsRef<str>) -> Self {
458 self.instance_name = instance_name.as_ref().to_string();
459
460 self
461 }
462
463 pub fn endpoint(mut self, endpoint: impl AsRef<str>) -> Self {
465 self.endpoint = endpoint.as_ref().to_string();
466
467 self
468 }
469
470 pub fn http_client(mut self, client: reqwest::Client) -> Self {
472 self.http_client = Some(client);
473
474 self
475 }
476
477 pub fn retry_policy_mut(&mut self) -> &mut Box<dyn RetryPolicy> {
479 &mut self.retry_policy
480 }
481
482 pub fn build(self) -> OtsClient {
483 let Self {
484 access_key_id,
485 access_key_secret,
486 sts_token,
487 retry_policy,
488 region,
489 instance_name,
490 endpoint,
491 http_client,
492 } = self;
493
494 OtsClient {
495 access_key_id,
496 access_key_secret,
497 sts_token,
498 region,
499 instance_name,
500 endpoint,
501 http_client: http_client.unwrap_or(reqwest::Client::new()),
502 retry_policy,
503 }
504 }
505}
506
507#[allow(dead_code)]
509#[derive(Clone)]
510pub struct OtsClient {
511 access_key_id: String,
512 access_key_secret: String,
513 sts_token: Option<String>,
514 region: String,
515 instance_name: String,
516 endpoint: String,
517 http_client: reqwest::Client,
518 retry_policy: Box<dyn RetryPolicy>,
519}
520
521impl std::fmt::Debug for OtsClient {
522 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
523 f.debug_struct("OtsClient")
524 .field("access_key_id", &self.access_key_id)
525 .field("region", &self.region)
526 .field("instance_name", &self.instance_name)
527 .field("endpoint", &self.endpoint)
528 .field("http_client", &self.http_client)
529 .finish()
530 }
531}
532
533impl OtsClient {
534 fn parse_instance_and_region(endpoint: &str) -> (&str, &str) {
535 let s = endpoint.strip_prefix("http://").unwrap_or(endpoint);
536 let s = s.strip_prefix("https://").unwrap_or(s);
537 let parts = s.split(".").collect::<Vec<_>>();
538 if parts.len() < 2 {
539 panic!("can not parse instance name and region from endpoint: {}", endpoint);
540 }
541
542 (parts[0], parts[1])
543 }
544
545 pub fn from_env() -> Self {
551 let access_key_id = std::env::var("ALIYUN_OTS_AK_ID").expect("env var ALI_ACCESS_KEY_ID is missing");
552 let access_key_secret = std::env::var("ALIYUN_OTS_AK_SEC").expect("env var ALI_ACCESS_KEY_SECRET is missing");
553 let endpoint = std::env::var("ALIYUN_OTS_ENDPOINT").expect("env var ALI_OSS_ENDPOINT is missing");
554 let endpoint = endpoint.to_lowercase();
555 let (instance_name, region) = Self::parse_instance_and_region(endpoint.as_str());
556
557 Self {
558 access_key_id,
559 access_key_secret,
560 sts_token: None,
561 region: region.to_string(),
562 instance_name: instance_name.to_string(),
563 endpoint,
564 http_client: reqwest::Client::new(),
565 retry_policy: Box::new(DefaultRetryPolicy::default()),
566 }
567 }
568
569 pub fn new(ak_id: impl AsRef<str>, ak_sec: impl AsRef<str>, endpoint: impl AsRef<str>) -> Self {
577 let endpoint = endpoint.as_ref().to_lowercase();
578
579 let (instance_name, region) = Self::parse_instance_and_region(endpoint.as_str());
580
581 Self {
582 access_key_id: ak_id.as_ref().to_string(),
583 access_key_secret: ak_sec.as_ref().to_string(),
584 region: region.to_string(),
585 instance_name: instance_name.to_string(),
586 endpoint,
587 http_client: reqwest::Client::new(),
588 sts_token: None,
589 retry_policy: Box::new(DefaultRetryPolicy::default()),
590 }
591 }
592
593 pub fn builder(ak_id: impl AsRef<str>, ak_sec: impl AsRef<str>) -> OtsClientBuilder {
600 OtsClientBuilder::new(ak_id, ak_sec)
601 }
602
603 fn fill_signature_v2(&self, operation: &str, headers: &mut HashMap<String, String>) {
605 let date_time_string = get_iso8601_date_time_string();
606 let date = &date_time_string[..10].replace("-", "");
607
608 headers.insert("user-agent".to_string(), USER_AGENT.to_string());
609 headers.insert(HEADER_API_VERSION.to_string(), API_VERSION.to_string());
610 headers.insert(HEADER_DATE.to_string(), date_time_string.clone());
611 headers.insert(HEADER_SIGN_DATE.to_string(), date.to_string());
612 headers.insert(HEADER_ACCESS_KEY_ID.to_string(), self.access_key_id.clone());
613 headers.insert(HEADER_INSTANCE_NAME.to_string(), self.instance_name.clone());
614 headers.insert(HEADER_SIGN_REGION.to_string(), self.region.clone());
615
616 if let Some(s) = &self.sts_token {
617 headers.insert(HEADER_STS_TOKEN.to_string(), s.to_string());
618 }
619
620 let mut canonical_headers = headers
621 .iter()
622 .map(|(k, v)| (k.to_lowercase(), v))
623 .filter(|(k, _)| k.starts_with("x-ots-") && k != HEADER_SIGNATURE)
624 .map(|(k, v)| format!("{}:{}", k, v))
625 .collect::<Vec<_>>();
626 canonical_headers.sort();
627
628 let canonical_headers = canonical_headers.join("\n");
629
630 let string_to_sign = format!("/{}\nPOST\n\n{}\n", operation, canonical_headers);
631
632 log::debug!("string to sign: \n-----\n{}\n-----", string_to_sign);
633 let sig = util::hmac_sha1(self.access_key_secret.as_bytes(), string_to_sign.as_bytes());
634 let sig_string = BASE64_STANDARD.encode(&sig);
635
636 log::debug!("signature = {}", sig_string);
637
638 headers.insert(HEADER_SIGNATURE.to_string(), sig_string);
639 }
640
641 #[allow(dead_code)]
643 fn fill_signature_v4(&self, operation: &str, headers: &mut HashMap<String, String>) {
644 let date_time_string = get_iso8601_date_time_string();
645 let date_string = &date_time_string[..10].replace("-", "");
646
647 headers.insert("user-agent".to_string(), USER_AGENT.to_string());
648 headers.insert(HEADER_API_VERSION.to_string(), API_VERSION.to_string());
649 headers.insert(HEADER_DATE.to_string(), date_time_string.clone());
650 headers.insert(HEADER_SIGN_DATE.to_string(), date_string.to_string());
651 headers.insert(HEADER_ACCESS_KEY_ID.to_string(), self.access_key_id.clone());
652 headers.insert(HEADER_INSTANCE_NAME.to_string(), self.instance_name.clone());
653 headers.insert(HEADER_SIGN_REGION.to_string(), self.region.clone());
654
655 if let Some(s) = &self.sts_token {
656 headers.insert(HEADER_STS_TOKEN.to_string(), s.to_string());
657 }
658
659 let mut canonical_headers = headers
660 .iter()
661 .map(|(k, v)| (k.to_lowercase(), v))
662 .filter(|(k, _)| k.starts_with("x-ots-") && k != HEADER_SIGNATURE_V4)
663 .map(|(k, v)| format!("{}:{}", k, v))
664 .collect::<Vec<_>>();
665 canonical_headers.sort();
666
667 let canonical_headers = canonical_headers.join("\n");
668
669 let string_to_sign = format!("/{}\nPOST\n\n{}\nots", operation, canonical_headers);
670
671 log::debug!("string to sign: \n-----\n{}\n----", string_to_sign);
672
673 let sign = hmac_sha256(self.access_key_secret.as_bytes(), string_to_sign.as_bytes());
674
675 headers.insert(HEADER_SIGNATURE_V4.to_string(), BASE64_STANDARD.encode(sign));
676 }
677
678 pub async fn send(&self, req: OtsRequest) -> OtsResult<Response> {
680 let OtsRequest {
681 method,
682 operation,
683 mut headers,
684 query: _,
685 body,
686 options,
687 } = req;
688
689 headers.insert("content-lenght".to_string(), format!("{}", body.len()));
691 let content_md5_base64 = BASE64_STANDARD.encode(md5::compute(&body).as_slice());
692 headers.insert(HEADER_CONTENT_MD5.to_string(), content_md5_base64);
693
694 let url = Url::parse(format!("{}/{}", self.endpoint, operation).as_str()).unwrap();
695 let request_body = Bytes::from_owner(body);
696 let mut retried = 0u32;
697
698 loop {
699 self.fill_signature_v2(&operation.to_string(), &mut headers);
700
701 let mut header_map = HeaderMap::new();
702 headers.iter().for_each(|(k, v)| {
703 log::debug!(">> header: {}: {}", k, v);
704 header_map.insert(HeaderName::from_str(&k.to_lowercase()).unwrap(), HeaderValue::from_str(v).unwrap());
705 });
706
707 let mut request_builder = self
708 .http_client
709 .request(method.clone(), url.clone())
710 .headers(header_map.clone())
711 .body(request_body.clone());
712
713 if let Some(ms) = options.timeout_ms {
715 request_builder = request_builder.timeout(Duration::from_millis(ms));
716 }
717
718 let response = request_builder.send().await?;
719
720 response.headers().iter().for_each(|(k, v)| {
721 log::debug!("<< header: {}: {}", k, v.to_str().unwrap());
722 });
723
724 if response.status().is_success() {
725 return Ok(response);
726 }
727
728 if !&response.status().is_success() {
729 let status = response.status();
730
731 let e = match response.bytes().await {
732 Ok(bytes) => {
733 let api_error = protos::Error::decode(bytes)?;
734 OtsError::ApiError(Box::new(api_error))
735 }
736 Err(_) => OtsError::StatusError(status, "".to_string()),
737 };
738
739 log::error!("api call failed, check retry against retry policy for operation {} and error {}", operation, e);
740 let should_retry = self.retry_policy.should_retry(retried, operation, &e);
741 log::info!("should retry: {} for operation {} with error {}", should_retry, operation, e);
742
743 if !should_retry {
744 return Err(e);
745 }
746
747 let next_delay = self.retry_policy.delay_ms();
748 log::info!("delay for {} ms to retry", next_delay);
749 tokio::time::sleep(tokio::time::Duration::from_millis(next_delay as u64)).await;
750
751 retried += 1;
752 }
753 }
754 }
755
756 pub fn list_table(&self) -> ListTableOperation {
758 ListTableOperation::new(self.clone())
759 }
760
761 pub fn create_table(&self, request: CreateTableRequest) -> CreateTableOperation {
789 CreateTableOperation::new(self.clone(), request)
790 }
791
792 pub fn update_table(&self, request: UpdateTableRequest) -> UpdateTableOperation {
794 UpdateTableOperation::new(self.clone(), request)
795 }
796
797 pub fn describe_table(&self, table_name: &str) -> DescribeTableOperation {
799 DescribeTableOperation::new(self.clone(), table_name)
800 }
801
802 pub fn delete_table(&self, table_name: &str) -> DeleteTableOperation {
804 DeleteTableOperation::new(self.clone(), table_name)
805 }
806
807 pub fn compute_split_points_by_size(&self, request: ComputeSplitPointsBySizeRequest) -> ComputeSplitPointsBySizeOperation {
809 ComputeSplitPointsBySizeOperation::new(self.clone(), request)
810 }
811
812 pub fn add_defined_column(&self, request: AddDefinedColumnRequest) -> AddDefinedColumnOperation {
828 AddDefinedColumnOperation::new(self.clone(), request)
829 }
830
831 pub fn delete_defined_column(&self, request: DeleteDefinedColumnRequest) -> DeleteDefinedColumnOperation {
842 DeleteDefinedColumnOperation::new(self.clone(), request)
843 }
844
845 pub fn get_row(&self, request: GetRowRequest) -> GetRowOperation {
861 GetRowOperation::new(self.clone(), request)
862 }
863
864 pub fn get_range(&self, request: GetRangeRequest) -> GetRangeOperation {
897 GetRangeOperation::new(self.clone(), request)
898 }
899
900 pub fn put_row(&self, request: PutRowRequest) -> PutRowOperation {
917 PutRowOperation::new(self.clone(), request)
918 }
919
920 pub fn update_row(&self, request: UpdateRowRequest) -> UpdateRowOperation {
942 UpdateRowOperation::new(self.clone(), request)
943 }
944
945 pub fn delete_row(&self, request: DeleteRowRequest) -> DeleteRowOperation {
955 DeleteRowOperation::new(self.clone(), request)
956 }
957
958 pub fn batch_get_row(&self, request: BatchGetRowRequest) -> BatchGetRowOperation {
984 BatchGetRowOperation::new(self.clone(), request)
985 }
986
987 pub fn batch_write_row(&self, request: BatchWriteRowRequest) -> BatchWriteRowOperation {
1022 BatchWriteRowOperation::new(self.clone(), request)
1023 }
1024
1025 pub fn bulk_import(&self, request: BulkImportRequest) -> BulkImportOperation {
1052 BulkImportOperation::new(self.clone(), request)
1053 }
1054
1055 pub fn bulk_export(&self, request: BulkExportRequest) -> BulkExportOperation {
1074 BulkExportOperation::new(self.clone(), request)
1075 }
1076
1077 pub fn create_index(&self, request: CreateIndexRequest) -> CreateIndexOperation {
1079 CreateIndexOperation::new(self.clone(), request)
1080 }
1081
1082 pub fn drop_index(&self, table_name: &str, idx_name: &str) -> DropIndexOperation {
1084 DropIndexOperation::new(self.clone(), table_name, idx_name)
1085 }
1086
1087 pub fn list_search_index(&self, table_name: Option<&str>) -> ListSearchIndexOperation {
1089 ListSearchIndexOperation::new(self.clone(), table_name)
1090 }
1091
1092 pub fn create_search_index(&self, request: CreateSearchIndexRequest) -> CreateSearchIndexOperation {
1094 CreateSearchIndexOperation::new(self.clone(), request)
1095 }
1096
1097 pub fn describe_search_index(&self, table_name: &str, index_name: &str) -> DescribeSearchIndexOperation {
1099 DescribeSearchIndexOperation::new(self.clone(), table_name, index_name)
1100 }
1101
1102 pub fn update_search_index(&self, request: UpdateSearchIndexRequest) -> UpdateSearchIndexOperation {
1104 UpdateSearchIndexOperation::new(self.clone(), request)
1105 }
1106
1107 pub fn delete_search_index(&self, table_name: &str, index_name: &str) -> DeleteSearchIndexOperation {
1109 DeleteSearchIndexOperation::new(self.clone(), table_name, index_name)
1110 }
1111
1112 pub fn search(&self, request: SearchRequest) -> SearchOperation {
1114 SearchOperation::new(self.clone(), request)
1115 }
1116
1117 pub fn compute_splits(&self, table_name: &str, index_name: &str) -> ComputeSplitsOperation {
1119 ComputeSplitsOperation::new(self.clone(), table_name, index_name)
1120 }
1121
1122 pub fn parallel_scan(&self, request: ParallelScanRequest) -> ParallelScanOperation {
1124 ParallelScanOperation::new(self.clone(), request)
1125 }
1126
1127 pub fn get_timeseries_data(&self, request: GetTimeseriesDataRequest) -> GetTimeseriesDataOperation {
1129 GetTimeseriesDataOperation::new(self.clone(), request)
1130 }
1131
1132 pub fn put_timeseries_data(&self, request: PutTimeseriesDataRequest) -> PutTimeseriesDataOperation {
1164 PutTimeseriesDataOperation::new(self.clone(), request)
1165 }
1166
1167 pub fn create_timeseries_table(&self, request: CreateTimeseriesTableRequest) -> CreateTimeseriesTableOperation {
1169 CreateTimeseriesTableOperation::new(self.clone(), request)
1170 }
1171
1172 pub fn describe_timeseries_table(&self, table_name: &str) -> DescribeTimeseriesTableOperation {
1174 DescribeTimeseriesTableOperation::new(self.clone(), table_name)
1175 }
1176
1177 pub fn list_timeseries_table(&self) -> ListTimeseriesTableOperation {
1179 ListTimeseriesTableOperation::new(self.clone())
1180 }
1181
1182 pub fn update_timeseries_table(&self, request: UpdateTimeseriesTableRequest) -> UpdateTimeseriesTableOperation {
1184 UpdateTimeseriesTableOperation::new(self.clone(), request)
1185 }
1186
1187 pub fn delete_timeseries_table(&self, table_name: &str) -> DeleteTimeseriesTableOperation {
1189 DeleteTimeseriesTableOperation::new(self.clone(), table_name)
1190 }
1191
1192 pub fn create_timeseries_lastpoint_index(&self, request: CreateTimeseriesLastpointIndexRequest) -> CreateTimeseriesLastpointIndexOperation {
1194 CreateTimeseriesLastpointIndexOperation::new(self.clone(), request)
1195 }
1196
1197 pub fn delete_timeseries_lastpoint_index(&self, table_name: &str, index_name: &str) -> DeleteTimeseriesLastpointIndexOperation {
1199 DeleteTimeseriesLastpointIndexOperation::new(self.clone(), table_name, index_name)
1200 }
1201
1202 pub fn create_timeseries_analytical_store(&self, request: CreateTimeseriesAnalyticalStoreRequest) -> CreateTimeseriesAnalyticalStoreOperation {
1204 CreateTimeseriesAnalyticalStoreOperation::new(self.clone(), request)
1205 }
1206
1207 pub fn update_timeseries_analytical_store(&self, request: UpdateTimeseriesAnalyticalStoreRequest) -> UpdateTimeseriesAnalyticalStoreOperation {
1209 UpdateTimeseriesAnalyticalStoreOperation::new(self.clone(), request)
1210 }
1211
1212 pub fn delete_timeseries_analytical_store(&self, request: DeleteTimeseriesAnalyticalStoreRequest) -> DeleteTimeseriesAnalyticalStoreOperation {
1214 DeleteTimeseriesAnalyticalStoreOperation::new(self.clone(), request)
1215 }
1216
1217 pub fn describe_timeseries_analytical_store(&self, table_name: &str, store_name: &str) -> DescribeTimeseriesAnalyticalStoreOperation {
1219 DescribeTimeseriesAnalyticalStoreOperation::new(self.clone(), table_name, store_name)
1220 }
1221
1222 pub fn query_timeseries_meta(&self, request: QueryTimeseriesMetaRequest) -> QueryTimeseriesMetaOperation {
1224 QueryTimeseriesMetaOperation::new(self.clone(), request)
1225 }
1226
1227 pub fn update_timeseries_meta(&self, request: UpdateTimeseriesMetaRequest) -> UpdateTimeseriesMetaOperation {
1229 UpdateTimeseriesMetaOperation::new(self.clone(), request)
1230 }
1231
1232 pub fn delete_timeseries_meta(&self, request: DeleteTimeseriesMetaRequest) -> DeleteTimeseriesMetaOperation {
1234 DeleteTimeseriesMetaOperation::new(self.clone(), request)
1235 }
1236
1237 pub fn split_timeseries_scan_task(&self, request: SplitTimeseriesScanTaskRequest) -> SplitTimeseriesScanTaskOperation {
1239 SplitTimeseriesScanTaskOperation::new(self.clone(), request)
1240 }
1241
1242 pub fn scan_timeseries_data(&self, request: ScanTimeseriesDataRequest) -> ScanTimeseriesDataOperation {
1244 ScanTimeseriesDataOperation::new(self.clone(), request)
1245 }
1246
1247 pub fn sql_query(&self, request: SqlQueryRequest) -> SqlQueryOperation {
1258 SqlQueryOperation::new(self.clone(), request)
1259 }
1260}