#![allow(clippy::all, unused_imports, dead_code)]
use crate::{Actor, ActorBehavior, ClientBuilderExt, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::time::Duration;
const BASE_URL: &str = "https://api.mailgun.net/v3";
const ENV_KEY: &str = "MAILGUN_API_KEY";
fn apply_auth(
config: &reflow_actor::ActorConfig,
mut builder: reqwest::RequestBuilder,
) -> Result<reqwest::RequestBuilder> {
let credential = config
.get_config_or_env(ENV_KEY)
.ok_or_else(|| anyhow::anyhow!("Missing env var: {}", ENV_KEY))?;
builder = builder.header("Authorization", format!("Basic {}", credential));
Ok(builder)
}
#[actor(
MailgunSendMessageActor,
inports::<100>(domain, from, to, subject, text),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_send_message(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/messages".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /{{domain}}/messages failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunReadMessageActor,
inports::<100>(domain, storage_key),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_read_message(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/messages/{storage_key}".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("storage_key") {
endpoint = endpoint.replace("{{storage_key}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /{{domain}}/messages/{{storage_key}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
MailgunDeleteMessageActor,
inports::<100>(domain, storage_key),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_delete_message(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/messages/{storage_key}".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("storage_key") {
endpoint = endpoint.replace("{{storage_key}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("DELETE /{{domain}}/messages/{{storage_key}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
MailgunListDomainsActor,
inports::<100>(limit, skip, state),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_list_domains(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/domains".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("skip") {
query_pairs.push(("skip", super::message_to_str(val)));
}
if let Some(val) = inputs.get("state") {
query_pairs.push(("state", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /domains failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunReadDomainActor,
inports::<100>(domain),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_read_domain(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/domains/{domain}".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /domains/{{domain}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunCreateDomainActor,
inports::<100>(name, smtp_password, spam_action, wildcard, force_dkim_authority, dkim_key_size),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_create_domain(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/domains".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("smtp_password") {
body.insert("smtp_password".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("spam_action") {
body.insert("spam_action".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("wildcard") {
body.insert("wildcard".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("force_dkim_authority") {
body.insert("force_dkim_authority".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("dkim_key_size") {
body.insert("dkim_key_size".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /domains failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunDeleteDomainActor,
inports::<100>(domain),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_delete_domain(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/domains/{domain}".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /domains/{{domain}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunReadDomainDnsRecordsActor,
inports::<100>(domain),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_read_domain_dns_records(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/domains/{domain}/connection".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /domains/{{domain}}/connection failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunUpdateDomainActor,
inports::<100>(domain, require_tls, skip_verification),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_update_domain(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/domains/{domain}/connection".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("require_tls") {
body.insert("require_tls".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("skip_verification") {
body.insert("skip_verification".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PUT /domains/{{domain}}/connection failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunListLogsActor,
inports::<100>(domain, limit, skip),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_list_logs(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/log".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("skip") {
query_pairs.push(("skip", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /{{domain}}/log failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunListEventsActor,
inports::<100>(domain, begin, end, ascending, limit, event, recipient, tags, message_id, subject),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_list_events(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/events".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("begin") {
query_pairs.push(("begin", super::message_to_str(val)));
}
if let Some(val) = inputs.get("end") {
query_pairs.push(("end", super::message_to_str(val)));
}
if let Some(val) = inputs.get("ascending") {
query_pairs.push(("ascending", super::message_to_str(val)));
}
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("event") {
query_pairs.push(("event", super::message_to_str(val)));
}
if let Some(val) = inputs.get("recipient") {
query_pairs.push(("recipient", super::message_to_str(val)));
}
if let Some(val) = inputs.get("tags") {
query_pairs.push(("tags", super::message_to_str(val)));
}
if let Some(val) = inputs.get("message_id") {
query_pairs.push(("message-id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("subject") {
query_pairs.push(("subject", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /{{domain}}/events failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunListBouncesActor,
inports::<100>(domain, limit, p),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_list_bounces(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/bounces".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("p") {
query_pairs.push(("p", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /{{domain}}/bounces failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunReadBounceActor,
inports::<100>(domain, address),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_read_bounce(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/bounces/{address}".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("address") {
endpoint = endpoint.replace("{{address}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /{{domain}}/bounces/{{address}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunCreateBounceActor,
inports::<100>(domain, address, code, error),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_create_bounce(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/bounces".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("address") {
body.insert("address".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("code") {
body.insert("code".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("error") {
body.insert("error".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /{{domain}}/bounces failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunDeleteBounceActor,
inports::<100>(domain, address),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_delete_bounce(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/bounces/{address}".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("address") {
endpoint = endpoint.replace("{{address}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("DELETE /{{domain}}/bounces/{{address}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
MailgunDeleteBouncesActor,
inports::<100>(domain),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_delete_bounces(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/bounces".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /{{domain}}/bounces failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunListUnsubscribesActor,
inports::<100>(domain, limit, p),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_list_unsubscribes(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/unsubscribes".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("p") {
query_pairs.push(("p", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /{{domain}}/unsubscribes failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunReadUnsubscribeActor,
inports::<100>(domain, address),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_read_unsubscribe(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/unsubscribes/{address}".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("address") {
endpoint = endpoint.replace("{{address}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /{{domain}}/unsubscribes/{{address}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
MailgunCreateUnsubscribeActor,
inports::<100>(domain, address, tag),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_create_unsubscribe(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/unsubscribes".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("address") {
body.insert("address".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("tag") {
body.insert("tag".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /{{domain}}/unsubscribes failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunDeleteUnsubscribeActor,
inports::<100>(domain, address),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_delete_unsubscribe(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/unsubscribes/{address}".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("address") {
endpoint = endpoint.replace("{{address}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("DELETE /{{domain}}/unsubscribes/{{address}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
MailgunListComplaintsActor,
inports::<100>(domain, limit, p),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_list_complaints(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/complaints".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("p") {
query_pairs.push(("p", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /{{domain}}/complaints failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunCreateComplaintActor,
inports::<100>(domain, address),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_create_complaint(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/complaints".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("address") {
body.insert("address".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /{{domain}}/complaints failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunDeleteComplaintActor,
inports::<100>(domain, address),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_delete_complaint(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/complaints/{address}".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("address") {
endpoint = endpoint.replace("{{address}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("DELETE /{{domain}}/complaints/{{address}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
MailgunListRoutesActor,
inports::<100>(limit, skip),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_list_routes(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/routes".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("skip") {
query_pairs.push(("skip", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /routes failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunReadRouteActor,
inports::<100>(id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_read_route(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/routes/{id}".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /routes/{{id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunCreateRouteActor,
inports::<100>(priority, description, expression, action),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_create_route(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/routes".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("priority") {
body.insert("priority".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("description") {
body.insert("description".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("expression") {
body.insert("expression".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("action") {
body.insert("action".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /routes failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunUpdateRouteActor,
inports::<100>(id, priority, description, expression, action),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_update_route(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/routes/{id}".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("priority") {
body.insert("priority".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("description") {
body.insert("description".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("expression") {
body.insert("expression".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("action") {
body.insert("action".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PUT /routes/{{id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunDeleteRouteActor,
inports::<100>(id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_delete_route(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/routes/{id}".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /routes/{{id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunListMailingListsActor,
inports::<100>(limit, p),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_list_mailing_lists(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/lists/pages".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("p") {
query_pairs.push(("p", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /lists/pages failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunReadMailingListActor,
inports::<100>(address),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_read_mailing_list(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/lists/{address}".to_string();
if let Some(val) = inputs.get("address") {
endpoint = endpoint.replace("{{address}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /lists/{{address}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunCreateMailingListActor,
inports::<100>(address, name, description, access_level, reply_preference),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_create_mailing_list(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/lists".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("address") {
body.insert("address".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("description") {
body.insert("description".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("access_level") {
body.insert("access_level".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("reply_preference") {
body.insert("reply_preference".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /lists failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunUpdateMailingListActor,
inports::<100>(address, name, description, access_level),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_update_mailing_list(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/lists/{address}".to_string();
if let Some(val) = inputs.get("address") {
endpoint = endpoint.replace("{{address}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("description") {
body.insert("description".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("access_level") {
body.insert("access_level".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PUT /lists/{{address}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunDeleteMailingListActor,
inports::<100>(address),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_delete_mailing_list(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/lists/{address}".to_string();
if let Some(val) = inputs.get("address") {
endpoint = endpoint.replace("{{address}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /lists/{{address}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunListMailingListMembersActor,
inports::<100>(address, limit, subscribed, p),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_list_mailing_list_members(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/lists/{address}/members/pages".to_string();
if let Some(val) = inputs.get("address") {
endpoint = endpoint.replace("{{address}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("subscribed") {
query_pairs.push(("subscribed", super::message_to_str(val)));
}
if let Some(val) = inputs.get("p") {
query_pairs.push(("p", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /lists/{{address}}/members/pages failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
MailgunCreateMailingListMemberActor,
inports::<100>(address, name, vars, subscribed, upsert),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_create_mailing_list_member(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/lists/{address}/members".to_string();
if let Some(val) = inputs.get("address") {
endpoint = endpoint.replace("{{address}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("address") {
body.insert("address".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("vars") {
body.insert("vars".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("subscribed") {
body.insert("subscribed".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("upsert") {
body.insert("upsert".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /lists/{{address}}/members failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunUpdateMailingListMemberActor,
inports::<100>(address, member_address, name, vars, subscribed),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_update_mailing_list_member(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/lists/{address}/members/{member_address}".to_string();
if let Some(val) = inputs.get("address") {
endpoint = endpoint.replace("{{address}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("member_address") {
endpoint = endpoint.replace("{{member_address}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("vars") {
body.insert("vars".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("subscribed") {
body.insert("subscribed".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"PUT /lists/{{address}}/members/{{member_address}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
MailgunDeleteMailingListMemberActor,
inports::<100>(address, member_address),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_delete_mailing_list_member(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/lists/{address}/members/{member_address}".to_string();
if let Some(val) = inputs.get("address") {
endpoint = endpoint.replace("{{address}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("member_address") {
endpoint = endpoint.replace("{{member_address}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"DELETE /lists/{{address}}/members/{{member_address}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
MailgunListTemplatesActor,
inports::<100>(domain, p, limit),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_list_templates(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/templates".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("p") {
query_pairs.push(("p", super::message_to_str(val)));
}
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /{{domain}}/templates failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunReadTemplateActor,
inports::<100>(domain, name, active),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_read_template(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/templates/{name}".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("name") {
endpoint = endpoint.replace("{{name}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("active") {
query_pairs.push(("active", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /{{domain}}/templates/{{name}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunCreateTemplateActor,
inports::<100>(domain, name, description, template, tag, engine),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_create_template(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/templates".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("description") {
body.insert("description".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("template") {
body.insert("template".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("tag") {
body.insert("tag".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("engine") {
body.insert("engine".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /{{domain}}/templates failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunUpdateTemplateActor,
inports::<100>(domain, name, description),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_update_template(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/templates/{name}".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("name") {
endpoint = endpoint.replace("{{name}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("description") {
body.insert("description".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PUT /{{domain}}/templates/{{name}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunDeleteTemplateActor,
inports::<100>(domain, name),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_delete_template(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/templates/{name}".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("name") {
endpoint = endpoint.replace("{{name}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("DELETE /{{domain}}/templates/{{name}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
MailgunReadEmailValidationActor,
inports::<100>(address, provider_lookup),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_read_email_validation(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/v4/address/validate".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("address") {
query_pairs.push(("address", super::message_to_str(val)));
}
if let Some(val) = inputs.get("provider_lookup") {
query_pairs.push(("provider_lookup", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /v4/address/validate failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
MailgunReadDomainStatsActor,
inports::<100>(domain, event, start, end, resolution, duration),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn mailgun_read_domain_stats(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{domain}/stats/total".to_string();
if let Some(val) = inputs.get("domain") {
endpoint = endpoint.replace("{{domain}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("event") {
query_pairs.push(("event", super::message_to_str(val)));
}
if let Some(val) = inputs.get("start") {
query_pairs.push(("start", super::message_to_str(val)));
}
if let Some(val) = inputs.get("end") {
query_pairs.push(("end", super::message_to_str(val)));
}
if let Some(val) = inputs.get("resolution") {
query_pairs.push(("resolution", super::message_to_str(val)));
}
if let Some(val) = inputs.get("duration") {
query_pairs.push(("duration", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /{{domain}}/stats/total failed: {}", e).into()),
);
}
}
Ok(output)
}