azure_cosmos_mirror/clients/
cosmos.rs

1use crate::clients::DatabaseClient;
2use crate::operations::*;
3use crate::resources::permission::AuthorizationToken;
4use crate::resources::ResourceType;
5use crate::ReadonlyString;
6
7use azure_core::{ClientOptions, Context, Pipeline, Request, Response};
8
9use std::fmt::Debug;
10use std::sync::Arc;
11
12/// The well-known account key used by Azure Cosmos DB Emulator.
13/// https://docs.microsoft.com/azure/cosmos-db/local-emulator?tabs=ssl-netstd21#connect-with-emulator-apis
14pub const EMULATOR_ACCOUNT_KEY: &str =
15    "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
16
17/// A builder for the cosmos client.
18#[derive(Debug, Clone)]
19pub struct CosmosClientBuilder {
20    cloud_location: CloudLocation,
21    options: ClientOptions,
22}
23
24impl CosmosClientBuilder {
25    /// Create a new instance of `CosmosClientBuilder`.
26    #[must_use]
27    pub fn new(account: impl Into<String>, auth_token: AuthorizationToken) -> Self {
28        Self::with_location(CloudLocation::Public {
29            account: account.into(),
30            auth_token,
31        })
32    }
33
34    /// Create a new instance of `CosmosClientBuilder` with a cloud location.
35    #[must_use]
36    pub fn with_location(cloud_location: CloudLocation) -> Self {
37        Self {
38            options: ClientOptions::default(),
39            cloud_location,
40        }
41    }
42
43    /// Convert the builder into a `CosmosClient` instance.
44    #[must_use]
45    pub fn build(self) -> CosmosClient {
46        let auth_token = self.cloud_location.auth_token();
47        CosmosClient {
48            pipeline: new_pipeline_from_options(self.options, auth_token),
49            cloud_location: self.cloud_location,
50        }
51    }
52
53    /// Set the cloud location.
54    #[must_use]
55    pub fn cloud_location(mut self, cloud_location: CloudLocation) -> Self {
56        self.cloud_location = cloud_location;
57        self
58    }
59
60    /// Set the retry options.
61    #[must_use]
62    pub fn retry(mut self, retry: impl Into<azure_core::RetryOptions>) -> Self {
63        self.options = self.options.retry(retry);
64        self
65    }
66
67    /// Set the transport options.
68    #[must_use]
69    pub fn transport(mut self, transport: impl Into<azure_core::TransportOptions>) -> Self {
70        self.options = self.options.transport(transport);
71        self
72    }
73}
74
75/// A plain Cosmos client.
76#[derive(Debug, Clone)]
77pub struct CosmosClient {
78    pipeline: Pipeline,
79    cloud_location: CloudLocation,
80}
81
82impl CosmosClient {
83    /// Create a new `CosmosClient` which connects to the account's instance in the public Azure cloud.
84    #[must_use]
85    pub fn new(account: impl Into<String>, auth_token: AuthorizationToken) -> Self {
86        CosmosClientBuilder::new(account, auth_token).build()
87    }
88
89    /// Create a new `CosmosClientBuilder`.
90    #[must_use]
91    pub fn builder(
92        account: impl Into<String>,
93        auth_token: AuthorizationToken,
94    ) -> CosmosClientBuilder {
95        CosmosClientBuilder::new(account, auth_token)
96    }
97
98    /// Set the auth token used
99    #[must_use]
100    pub fn auth_token(mut self, auth_token: AuthorizationToken) -> Self {
101        // we replace the AuthorizationPolicy. This is
102        // the last-1 policy by construction.
103        let auth_policy: Arc<dyn azure_core::Policy> =
104            Arc::new(crate::AuthorizationPolicy::new(auth_token));
105
106        self.pipeline
107            .replace_policy(auth_policy, self.pipeline.policies().len() - 2);
108        self
109    }
110
111    /// Create a database
112    pub fn create_database<S: AsRef<str>>(&self, database_name: S) -> CreateDatabaseBuilder {
113        CreateDatabaseBuilder::new(self.clone(), database_name.as_ref().to_owned())
114    }
115
116    /// List all databases
117    pub fn list_databases(&self) -> ListDatabasesBuilder {
118        ListDatabasesBuilder::new(self.clone())
119    }
120
121    /// Create a [`DatabaseClient`].
122    pub fn database_client<S: Into<ReadonlyString>>(&self, database_name: S) -> DatabaseClient {
123        DatabaseClient::new(self.clone(), database_name)
124    }
125
126    /// Prepares' an `azure_core::Request`.
127    ///
128    /// This function will add the cloud location to the URI suffix and generate
129    /// a Request with the specified HTTP Method. It will also set the body
130    /// to an empty `Bytes` instance.
131    pub(crate) fn request(&self, uri_path: &str, http_method: azure_core::Method) -> Request {
132        let uri = format!("{}/{}", self.cloud_location.url(), uri_path);
133        Request::new(uri.parse().unwrap(), http_method)
134    }
135
136    /// Sends a request through the pipeline
137    pub(crate) async fn send(
138        &self,
139        mut request: Request,
140        mut context: Context,
141        resource_type: ResourceType,
142    ) -> azure_core::Result<Response> {
143        self.pipeline
144            .send(context.insert(resource_type), &mut request)
145            .await
146    }
147
148    /// Access this client's pipeline
149    pub(crate) fn pipeline(&self) -> &Pipeline {
150        &self.pipeline
151    }
152}
153
154/// Create a Pipeline from CosmosOptions
155fn new_pipeline_from_options(
156    options: ClientOptions,
157    authorization_token: AuthorizationToken,
158) -> Pipeline {
159    let auth_policy: Arc<dyn azure_core::Policy> =
160        Arc::new(crate::AuthorizationPolicy::new(authorization_token));
161
162    // The `AuthorizationPolicy` must be the **last** retry policy.
163    // Policies can change the url and/or the headers, and the `AuthorizationPolicy`
164    // must be able to inspect them or the resulting token will be invalid.
165    let per_retry_policies = vec![auth_policy];
166
167    Pipeline::new(
168        option_env!("CARGO_PKG_NAME"),
169        option_env!("CARGO_PKG_VERSION"),
170        options,
171        Vec::new(),
172        per_retry_policies,
173    )
174}
175
176/// The cloud with which you want to interact.
177// TODO: Other govt clouds?
178#[derive(Debug, Clone)]
179pub enum CloudLocation {
180    /// Azure public cloud
181    Public {
182        account: String,
183        auth_token: AuthorizationToken,
184    },
185    /// Azure China cloud
186    China {
187        account: String,
188        auth_token: AuthorizationToken,
189    },
190    /// Use the well-known Cosmos emulator
191    Emulator { address: String, port: u16 },
192    /// A custom base URL
193    Custom {
194        uri: String,
195        auth_token: AuthorizationToken,
196    },
197}
198
199impl CloudLocation {
200    /// the base URL for a given cloud location
201    fn url(&self) -> String {
202        match self {
203            CloudLocation::Public { account, .. } => {
204                format!("https://{account}.documents.azure.com")
205            }
206            CloudLocation::China { account, .. } => format!("https://{account}.documents.azure.cn"),
207            CloudLocation::Custom { uri, .. } => uri.clone(),
208            CloudLocation::Emulator { address, port } => format!("https://{address}:{port}"),
209        }
210    }
211
212    fn auth_token(&self) -> AuthorizationToken {
213        match self {
214            CloudLocation::Public { auth_token, .. } => auth_token.clone(),
215            CloudLocation::China { auth_token, .. } => auth_token.clone(),
216            CloudLocation::Emulator { .. } => {
217                AuthorizationToken::primary_from_base64(EMULATOR_ACCOUNT_KEY).unwrap()
218            }
219            CloudLocation::Custom { auth_token, .. } => auth_token.clone(),
220        }
221    }
222}