#![allow(clippy::all, unused_imports, dead_code)]
use crate::{Actor, ActorBehavior, ClientBuilderExt, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::time::Duration;
const BASE_URL: &str = "https://cloud.mongodb.com/api/atlas/v2";
const ENV_KEY: &str = "MONGODB_ATLAS_API_KEY";
fn apply_auth(
config: &reflow_actor::ActorConfig,
mut builder: reqwest::RequestBuilder,
) -> Result<reqwest::RequestBuilder> {
let credential = config
.get_config_or_env(ENV_KEY)
.ok_or_else(|| anyhow::anyhow!("Missing env var: {}", ENV_KEY))?;
builder = builder.header("Authorization", format!("Bearer {}", credential));
Ok(builder)
}
#[actor(
MongodbAtlasListClustersActor,
inports::<100>(groupId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_list_clusters(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/clusters".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /groups/{{groupId}}/clusters failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasCreateClusterActor,
inports::<100>(groupId, name),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_create_cluster(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/clusters".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /groups/{{groupId}}/clusters failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasListProjectsActor,
inports::<100>(pageNum, itemsPerPage, includeCount),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_list_projects(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/groups".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("pageNum") {
query_pairs.push(("pageNum", super::message_to_str(val)));
}
if let Some(val) = inputs.get("itemsPerPage") {
query_pairs.push(("itemsPerPage", super::message_to_str(val)));
}
if let Some(val) = inputs.get("includeCount") {
query_pairs.push(("includeCount", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /groups failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasCreateProjectActor,
inports::<100>(name, orgId, projectOwnerId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_create_project(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/groups".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("orgId") {
body.insert("orgId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("projectOwnerId") {
body.insert("projectOwnerId".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /groups failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasReadProjectActor,
inports::<100>(groupId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_read_project(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /groups/{{groupId}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasDeleteProjectActor,
inports::<100>(groupId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_delete_project(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /groups/{{groupId}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasReadClusterActor,
inports::<100>(groupId, clusterName),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_read_cluster(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/clusters/{clusterName}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("clusterName") {
endpoint = endpoint.replace("{{clusterName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /groups/{{groupId}}/clusters/{{clusterName}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasUpdateClusterActor,
inports::<100>(groupId, clusterName, providerSettings, diskSizeGB, autoScaling),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_update_cluster(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/clusters/{clusterName}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("clusterName") {
endpoint = endpoint.replace("{{clusterName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("providerSettings") {
body.insert("providerSettings".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("diskSizeGB") {
body.insert("diskSizeGB".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("autoScaling") {
body.insert("autoScaling".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"PATCH /groups/{{groupId}}/clusters/{{clusterName}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasDeleteClusterActor,
inports::<100>(groupId, clusterName),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_delete_cluster(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/clusters/{clusterName}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("clusterName") {
endpoint = endpoint.replace("{{clusterName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"DELETE /groups/{{groupId}}/clusters/{{clusterName}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasStopClusterActor,
inports::<100>(groupId, clusterName),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_stop_cluster(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/clusters/{clusterName}/pauseCollection".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("clusterName") {
endpoint = endpoint.replace("{{clusterName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("POST /groups/{{groupId}}/clusters/{{clusterName}}/pauseCollection failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
MongodbAtlasListDatabaseUsersActor,
inports::<100>(groupId, pageNum, itemsPerPage),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_list_database_users(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/databaseUsers".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("pageNum") {
query_pairs.push(("pageNum", super::message_to_str(val)));
}
if let Some(val) = inputs.get("itemsPerPage") {
query_pairs.push(("itemsPerPage", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /groups/{{groupId}}/databaseUsers failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasCreateDatabaseUserActor,
inports::<100>(groupId, databaseName, username, password, roles, scopes),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_create_database_user(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/databaseUsers".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("databaseName") {
body.insert("databaseName".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("username") {
body.insert("username".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("password") {
body.insert("password".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("roles") {
body.insert("roles".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("scopes") {
body.insert("scopes".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("POST /groups/{{groupId}}/databaseUsers failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasReadDatabaseUserActor,
inports::<100>(groupId, username),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_read_database_user(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/databaseUsers/admin/{username}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("username") {
endpoint = endpoint.replace("{{username}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /groups/{{groupId}}/databaseUsers/admin/{{username}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasUpdateDatabaseUserActor,
inports::<100>(groupId, username, password, roles),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_update_database_user(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/databaseUsers/admin/{username}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("username") {
endpoint = endpoint.replace("{{username}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("password") {
body.insert("password".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("roles") {
body.insert("roles".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"PATCH /groups/{{groupId}}/databaseUsers/admin/{{username}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasDeleteDatabaseUserActor,
inports::<100>(groupId, username),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_delete_database_user(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/databaseUsers/admin/{username}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("username") {
endpoint = endpoint.replace("{{username}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"DELETE /groups/{{groupId}}/databaseUsers/admin/{{username}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasListProjectIpAccessListsActor,
inports::<100>(groupId, pageNum, itemsPerPage),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_list_project_ip_access_lists(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/accessList".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("pageNum") {
query_pairs.push(("pageNum", super::message_to_str(val)));
}
if let Some(val) = inputs.get("itemsPerPage") {
query_pairs.push(("itemsPerPage", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /groups/{{groupId}}/accessList failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasCreateProjectIpAccessListActor,
inports::<100>(groupId, cidrBlock, ipAddress, comment),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_create_project_ip_access_list(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/accessList".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("cidrBlock") {
body.insert("cidrBlock".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("ipAddress") {
body.insert("ipAddress".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("comment") {
body.insert("comment".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /groups/{{groupId}}/accessList failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasDeleteProjectIpAccessListActor,
inports::<100>(groupId, entryValue),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_delete_project_ip_access_list(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/accessList/{entryValue}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("entryValue") {
endpoint = endpoint.replace("{{entryValue}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"DELETE /groups/{{groupId}}/accessList/{{entryValue}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasListOrganizationsActor,
inports::<100>(pageNum, itemsPerPage),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_list_organizations(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/orgs".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("pageNum") {
query_pairs.push(("pageNum", super::message_to_str(val)));
}
if let Some(val) = inputs.get("itemsPerPage") {
query_pairs.push(("itemsPerPage", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /orgs failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasReadOrganizationActor,
inports::<100>(orgId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_read_organization(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/orgs/{orgId}".to_string();
if let Some(val) = inputs.get("orgId") {
endpoint = endpoint.replace("{{orgId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /orgs/{{orgId}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasDeleteOrganizationActor,
inports::<100>(orgId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_delete_organization(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/orgs/{orgId}".to_string();
if let Some(val) = inputs.get("orgId") {
endpoint = endpoint.replace("{{orgId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /orgs/{{orgId}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasListSnapshotsActor,
inports::<100>(groupId, clusterName, pageNum, itemsPerPage),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_list_snapshots(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/clusters/{clusterName}/backup/snapshots".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("clusterName") {
endpoint = endpoint.replace("{{clusterName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("pageNum") {
query_pairs.push(("pageNum", super::message_to_str(val)));
}
if let Some(val) = inputs.get("itemsPerPage") {
query_pairs.push(("itemsPerPage", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("GET /groups/{{groupId}}/clusters/{{clusterName}}/backup/snapshots failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
MongodbAtlasCreateSnapshotActor,
inports::<100>(groupId, clusterName, description, retentionInDays),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_create_snapshot(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/clusters/{clusterName}/backup/snapshots".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("clusterName") {
endpoint = endpoint.replace("{{clusterName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("description") {
body.insert("description".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("retentionInDays") {
body.insert("retentionInDays".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("POST /groups/{{groupId}}/clusters/{{clusterName}}/backup/snapshots failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
MongodbAtlasDeleteSnapshotActor,
inports::<100>(groupId, clusterName, snapshotId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_delete_snapshot(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint =
"/groups/{groupId}/clusters/{clusterName}/backup/snapshots/{snapshotId}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("clusterName") {
endpoint = endpoint.replace("{{clusterName}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("snapshotId") {
endpoint = endpoint.replace("{{snapshotId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("DELETE /groups/{{groupId}}/clusters/{{clusterName}}/backup/snapshots/{{snapshotId}} failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
MongodbAtlasCreateRestoreJobActor,
inports::<100>(groupId, clusterName, snapshotId, deliveryType, targetClusterName, targetGroupId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_create_restore_job(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/clusters/{clusterName}/backup/restoreJobs".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("clusterName") {
endpoint = endpoint.replace("{{clusterName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("snapshotId") {
body.insert("snapshotId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("deliveryType") {
body.insert("deliveryType".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("targetClusterName") {
body.insert("targetClusterName".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("targetGroupId") {
body.insert("targetGroupId".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("POST /groups/{{groupId}}/clusters/{{clusterName}}/backup/restoreJobs failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
MongodbAtlasListRestoreJobsActor,
inports::<100>(groupId, clusterName),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_list_restore_jobs(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/clusters/{clusterName}/backup/restoreJobs".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("clusterName") {
endpoint = endpoint.replace("{{clusterName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("GET /groups/{{groupId}}/clusters/{{clusterName}}/backup/restoreJobs failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
MongodbAtlasListApiKeysActor,
inports::<100>(orgId, pageNum, itemsPerPage),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_list_api_keys(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/orgs/{orgId}/apiKeys".to_string();
if let Some(val) = inputs.get("orgId") {
endpoint = endpoint.replace("{{orgId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("pageNum") {
query_pairs.push(("pageNum", super::message_to_str(val)));
}
if let Some(val) = inputs.get("itemsPerPage") {
query_pairs.push(("itemsPerPage", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /orgs/{{orgId}}/apiKeys failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasCreateApiKeyActor,
inports::<100>(orgId, desc, roles),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_create_api_key(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/orgs/{orgId}/apiKeys".to_string();
if let Some(val) = inputs.get("orgId") {
endpoint = endpoint.replace("{{orgId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("desc") {
body.insert("desc".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("roles") {
body.insert("roles".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /orgs/{{orgId}}/apiKeys failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasDeleteApiKeyActor,
inports::<100>(orgId, apiUserId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_delete_api_key(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/orgs/{orgId}/apiKeys/{apiUserId}".to_string();
if let Some(val) = inputs.get("orgId") {
endpoint = endpoint.replace("{{orgId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("apiUserId") {
endpoint = endpoint.replace("{{apiUserId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("DELETE /orgs/{{orgId}}/apiKeys/{{apiUserId}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasListProjectAlertsActor,
inports::<100>(groupId, status),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_list_project_alerts(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/alerts".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("status") {
query_pairs.push(("status", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /groups/{{groupId}}/alerts failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasReadProjectAlertActor,
inports::<100>(groupId, alertId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_read_project_alert(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/alerts/{alertId}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("alertId") {
endpoint = endpoint.replace("{{alertId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /groups/{{groupId}}/alerts/{{alertId}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasListAlertConfigsActor,
inports::<100>(groupId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_list_alert_configs(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/alertConfigs".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /groups/{{groupId}}/alertConfigs failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasCreateAlertConfigActor,
inports::<100>(groupId, eventTypeName, enabled, notifications, matchers, threshold),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_create_alert_config(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/alertConfigs".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("eventTypeName") {
body.insert("eventTypeName".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("enabled") {
body.insert("enabled".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("notifications") {
body.insert("notifications".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("matchers") {
body.insert("matchers".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("threshold") {
body.insert("threshold".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("POST /groups/{{groupId}}/alertConfigs failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasUpdateAlertConfigActor,
inports::<100>(groupId, alertConfigId, enabled, notifications),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_update_alert_config(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/alertConfigs/{alertConfigId}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("alertConfigId") {
endpoint = endpoint.replace("{{alertConfigId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("enabled") {
body.insert("enabled".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("notifications") {
body.insert("notifications".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"PUT /groups/{{groupId}}/alertConfigs/{{alertConfigId}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasDeleteAlertConfigActor,
inports::<100>(groupId, alertConfigId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_delete_alert_config(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/alertConfigs/{alertConfigId}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("alertConfigId") {
endpoint = endpoint.replace("{{alertConfigId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"DELETE /groups/{{groupId}}/alertConfigs/{{alertConfigId}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasListProjectEventsActor,
inports::<100>(groupId, eventType, minDate, maxDate),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_list_project_events(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/events".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("eventType") {
query_pairs.push(("eventType", super::message_to_str(val)));
}
if let Some(val) = inputs.get("minDate") {
query_pairs.push(("minDate", super::message_to_str(val)));
}
if let Some(val) = inputs.get("maxDate") {
query_pairs.push(("maxDate", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /groups/{{groupId}}/events failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasListClusterMetricsActor,
inports::<100>(groupId, processId, granularity, period, m),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_list_cluster_metrics(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/processes/{processId}/measurements".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("processId") {
endpoint = endpoint.replace("{{processId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("granularity") {
query_pairs.push(("granularity", super::message_to_str(val)));
}
if let Some(val) = inputs.get("period") {
query_pairs.push(("period", super::message_to_str(val)));
}
if let Some(val) = inputs.get("m") {
query_pairs.push(("m", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /groups/{{groupId}}/processes/{{processId}}/measurements failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasListProjectUsersActor,
inports::<100>(groupId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_list_project_users(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/users".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /groups/{{groupId}}/users failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasDeleteProjectUserActor,
inports::<100>(groupId, userId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_delete_project_user(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/users/{userId}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("userId") {
endpoint = endpoint.replace("{{userId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("DELETE /groups/{{groupId}}/users/{{userId}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasListSearchIndexesActor,
inports::<100>(groupId, clusterName, databaseName, collectionName),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_list_search_indexes(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint =
"/groups/{groupId}/clusters/{clusterName}/fts/indexes/{databaseName}/{collectionName}"
.to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("clusterName") {
endpoint = endpoint.replace("{{clusterName}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("databaseName") {
endpoint = endpoint.replace("{{databaseName}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("collectionName") {
endpoint = endpoint.replace("{{collectionName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("GET /groups/{{groupId}}/clusters/{{clusterName}}/fts/indexes/{{databaseName}}/{{collectionName}} failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
MongodbAtlasCreateSearchIndexActor,
inports::<100>(groupId, clusterName, collectionName, database, name, mappings),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_create_search_index(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/clusters/{clusterName}/fts/indexes".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("clusterName") {
endpoint = endpoint.replace("{{clusterName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("collectionName") {
body.insert("collectionName".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("database") {
body.insert("database".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("mappings") {
body.insert("mappings".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"POST /groups/{{groupId}}/clusters/{{clusterName}}/fts/indexes failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
MongodbAtlasUpdateSearchIndexActor,
inports::<100>(groupId, clusterName, indexId, mappings),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_update_search_index(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/clusters/{clusterName}/fts/indexes/{indexId}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("clusterName") {
endpoint = endpoint.replace("{{clusterName}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("indexId") {
endpoint = endpoint.replace("{{indexId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("mappings") {
body.insert("mappings".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("PATCH /groups/{{groupId}}/clusters/{{clusterName}}/fts/indexes/{{indexId}} failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
MongodbAtlasDeleteSearchIndexActor,
inports::<100>(groupId, clusterName, indexId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mongodb_atlas_delete_search_index(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/groups/{groupId}/clusters/{clusterName}/fts/indexes/{indexId}".to_string();
if let Some(val) = inputs.get("groupId") {
endpoint = endpoint.replace("{{groupId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("clusterName") {
endpoint = endpoint.replace("{{clusterName}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("indexId") {
endpoint = endpoint.replace("{{indexId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("DELETE /groups/{{groupId}}/clusters/{{clusterName}}/fts/indexes/{{indexId}} failed: {}", e).into()));
}
}
Ok(output)
}