use anyhow::{Context, Result};
use azure_core::auth::TokenCredential;
use serde::Deserialize;
use std::sync::Arc;
use super::diff::ContainerSpec;
const ARM: &str = "https://management.azure.com";
const COSMOS_API_VERSION: &str = "2024-05-15";
const ARM_SCOPE: &str = "https://management.azure.com/.default";
pub struct ArmCosmosClient {
pub credential: Arc<dyn TokenCredential>,
pub http: reqwest::Client,
pub subscription_id: String,
pub resource_group: String,
pub account: String,
}
impl ArmCosmosClient {
async fn token(&self) -> Result<String> {
let token = self
.credential
.get_token(&[ARM_SCOPE])
.await
.context("acquire ARM access token")?;
Ok(token.token.secret().to_string())
}
pub async fn ensure_database(&self, db: &str) -> Result<()> {
let url = format!(
"{ARM}/subscriptions/{}/resourceGroups/{}/providers/Microsoft.DocumentDB/databaseAccounts/{}/sqlDatabases/{}?api-version={COSMOS_API_VERSION}",
self.subscription_id, self.resource_group, self.account, db
);
let body = serde_json::json!({
"properties": { "resource": { "id": db } }
});
let token = self.token().await?;
let resp = self
.http
.put(&url)
.bearer_auth(&token)
.json(&body)
.send()
.await
.with_context(|| format!("PUT {url}"))?;
let status = resp.status();
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("PUT database {db} -> {url}: {status} - {body}");
}
Ok(())
}
pub async fn list_containers(&self, db: &str) -> Result<Vec<ContainerSpec>> {
let url = format!(
"{ARM}/subscriptions/{}/resourceGroups/{}/providers/Microsoft.DocumentDB/databaseAccounts/{}/sqlDatabases/{}/containers?api-version={COSMOS_API_VERSION}",
self.subscription_id, self.resource_group, self.account, db
);
let token = self.token().await?;
let resp = self
.http
.get(&url)
.bearer_auth(&token)
.send()
.await
.with_context(|| format!("GET {url}"))?;
let status = resp.status();
if status == reqwest::StatusCode::NOT_FOUND {
return Ok(vec![]);
}
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("LIST containers {url}: {status} - {body}");
}
#[derive(Deserialize)]
struct ListResp {
value: Vec<ContainerArmRecord>,
}
#[derive(Deserialize)]
struct ContainerArmRecord {
name: String,
properties: ContainerArmProps,
}
#[derive(Deserialize)]
struct ContainerArmProps {
resource: ContainerArmResource,
}
#[derive(Deserialize)]
struct ContainerArmResource {
#[serde(rename = "partitionKey")]
partition_key: PartitionKey,
}
#[derive(Deserialize)]
struct PartitionKey {
paths: Vec<String>,
}
let parsed: ListResp = resp.json().await.context("parse container list")?;
Ok(parsed
.value
.into_iter()
.map(|r| ContainerSpec {
name: r.name,
partition_key: r
.properties
.resource
.partition_key
.paths
.into_iter()
.next()
.unwrap_or_else(|| "/id".to_string()),
})
.collect())
}
pub async fn create_container(&self, db: &str, spec: &ContainerSpec) -> Result<()> {
let url = format!(
"{ARM}/subscriptions/{}/resourceGroups/{}/providers/Microsoft.DocumentDB/databaseAccounts/{}/sqlDatabases/{}/containers/{}?api-version={COSMOS_API_VERSION}",
self.subscription_id, self.resource_group, self.account, db, spec.name
);
let body = serde_json::json!({
"properties": {
"resource": {
"id": spec.name,
"partitionKey": { "paths": [spec.partition_key], "kind": "Hash" }
}
}
});
let token = self.token().await?;
let resp = self
.http
.put(&url)
.bearer_auth(&token)
.json(&body)
.send()
.await
.with_context(|| format!("PUT {url}"))?;
let status = resp.status();
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("PUT container {} -> {url}: {status} - {body}", spec.name);
}
Ok(())
}
}