#![allow(clippy::all, unused_imports, dead_code)]
use crate::{Actor, ActorBehavior, ClientBuilderExt, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::time::Duration;
const BASE_URL: &str = "https://api.hubapi.com";
const ENV_KEY: &str = "HUBSPOT_API_KEY";
fn apply_auth(
config: &reflow_actor::ActorConfig,
mut builder: reqwest::RequestBuilder,
) -> Result<reqwest::RequestBuilder> {
let credential = config
.get_config_or_env(ENV_KEY)
.ok_or_else(|| anyhow::anyhow!("Missing env var: {}", ENV_KEY))?;
builder = builder.header("Authorization", format!("Bearer {}", credential));
Ok(builder)
}
#[actor(
HubspotCreateContactActor,
inports::<100>(properties),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_create_contact(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/crm/v3/objects/contacts".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("properties") {
body.insert("properties".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /crm/v3/objects/contacts failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotReadContactActor,
inports::<100>(contactId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_read_contact(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/objects/contacts/{contactId}".to_string();
if let Some(val) = inputs.get("contactId") {
endpoint = endpoint.replace("{{contactId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /crm/v3/objects/contacts/{{contactId}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotSearchContactsActor,
inports::<100>(filterGroups),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_search_contacts(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/crm/v3/objects/contacts/search".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("filterGroups") {
body.insert("filterGroups".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("POST /crm/v3/objects/contacts/search failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotListContactsActor,
inports::<100>(limit, after, properties, associations, archived),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_list_contacts(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/crm/v3/objects/contacts".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("properties") {
query_pairs.push(("properties", super::message_to_str(val)));
}
if let Some(val) = inputs.get("associations") {
query_pairs.push(("associations", super::message_to_str(val)));
}
if let Some(val) = inputs.get("archived") {
query_pairs.push(("archived", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /crm/v3/objects/contacts failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotUpdateContactActor,
inports::<100>(contactId, properties),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_update_contact(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/objects/contacts/{contactId}".to_string();
if let Some(val) = inputs.get("contactId") {
endpoint = endpoint.replace("{{contactId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("properties") {
body.insert("properties".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("PATCH /crm/v3/objects/contacts/{{contactId}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotDeleteContactActor,
inports::<100>(contactId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_delete_contact(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/objects/contacts/{contactId}".to_string();
if let Some(val) = inputs.get("contactId") {
endpoint = endpoint.replace("{{contactId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"DELETE /crm/v3/objects/contacts/{{contactId}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotListCompaniesActor,
inports::<100>(limit, after, properties, archived),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_list_companies(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/crm/v3/objects/companies".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("properties") {
query_pairs.push(("properties", super::message_to_str(val)));
}
if let Some(val) = inputs.get("archived") {
query_pairs.push(("archived", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /crm/v3/objects/companies failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotReadCompanyActor,
inports::<100>(companyId, properties),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_read_company(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/objects/companies/{companyId}".to_string();
if let Some(val) = inputs.get("companyId") {
endpoint = endpoint.replace("{{companyId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("properties") {
query_pairs.push(("properties", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /crm/v3/objects/companies/{{companyId}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotCreateCompanyActor,
inports::<100>(properties),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_create_company(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/crm/v3/objects/companies".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("properties") {
body.insert("properties".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /crm/v3/objects/companies failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotUpdateCompanyActor,
inports::<100>(companyId, properties),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_update_company(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/objects/companies/{companyId}".to_string();
if let Some(val) = inputs.get("companyId") {
endpoint = endpoint.replace("{{companyId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("properties") {
body.insert("properties".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"PATCH /crm/v3/objects/companies/{{companyId}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotDeleteCompanyActor,
inports::<100>(companyId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_delete_company(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/objects/companies/{companyId}".to_string();
if let Some(val) = inputs.get("companyId") {
endpoint = endpoint.replace("{{companyId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"DELETE /crm/v3/objects/companies/{{companyId}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotListDealsActor,
inports::<100>(limit, after, properties),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_list_deals(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/crm/v3/objects/deals".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("properties") {
query_pairs.push(("properties", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /crm/v3/objects/deals failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotReadDealActor,
inports::<100>(dealId, properties),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_read_deal(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/objects/deals/{dealId}".to_string();
if let Some(val) = inputs.get("dealId") {
endpoint = endpoint.replace("{{dealId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("properties") {
query_pairs.push(("properties", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /crm/v3/objects/deals/{{dealId}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotCreateDealActor,
inports::<100>(properties, associations),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_create_deal(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/crm/v3/objects/deals".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("properties") {
body.insert("properties".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("associations") {
body.insert("associations".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /crm/v3/objects/deals failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotUpdateDealActor,
inports::<100>(dealId, properties),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_update_deal(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/objects/deals/{dealId}".to_string();
if let Some(val) = inputs.get("dealId") {
endpoint = endpoint.replace("{{dealId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("properties") {
body.insert("properties".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("PATCH /crm/v3/objects/deals/{{dealId}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotDeleteDealActor,
inports::<100>(dealId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_delete_deal(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/objects/deals/{dealId}".to_string();
if let Some(val) = inputs.get("dealId") {
endpoint = endpoint.replace("{{dealId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("DELETE /crm/v3/objects/deals/{{dealId}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotListTicketsActor,
inports::<100>(limit, after, properties),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_list_tickets(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/crm/v3/objects/tickets".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("properties") {
query_pairs.push(("properties", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /crm/v3/objects/tickets failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotCreateTicketActor,
inports::<100>(properties),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_create_ticket(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/crm/v3/objects/tickets".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("properties") {
body.insert("properties".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /crm/v3/objects/tickets failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotUpdateTicketActor,
inports::<100>(ticketId, properties),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_update_ticket(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/objects/tickets/{ticketId}".to_string();
if let Some(val) = inputs.get("ticketId") {
endpoint = endpoint.replace("{{ticketId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("properties") {
body.insert("properties".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("PATCH /crm/v3/objects/tickets/{{ticketId}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotDeleteTicketActor,
inports::<100>(ticketId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_delete_ticket(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/objects/tickets/{ticketId}".to_string();
if let Some(val) = inputs.get("ticketId") {
endpoint = endpoint.replace("{{ticketId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("DELETE /crm/v3/objects/tickets/{{ticketId}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotCreateAssociationActor,
inports::<100>(fromObjectType, fromObjectId, toObjectType, toObjectId, associationType),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_create_association(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/objects/{fromObjectType}/{fromObjectId}/associations/{toObjectType}/{toObjectId}/{associationType}".to_string();
if let Some(val) = inputs.get("fromObjectType") {
endpoint = endpoint.replace("{{fromObjectType}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("fromObjectId") {
endpoint = endpoint.replace("{{fromObjectId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("toObjectType") {
endpoint = endpoint.replace("{{toObjectType}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("toObjectId") {
endpoint = endpoint.replace("{{toObjectId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("associationType") {
endpoint = endpoint.replace("{{associationType}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("PUT /crm/v3/objects/{{fromObjectType}}/{{fromObjectId}}/associations/{{toObjectType}}/{{toObjectId}}/{{associationType}} failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
HubspotListAssociationsActor,
inports::<100>(objectType, objectId, toObjectType),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_list_associations(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint =
"/crm/v3/objects/{objectType}/{objectId}/associations/{toObjectType}".to_string();
if let Some(val) = inputs.get("objectType") {
endpoint = endpoint.replace("{{objectType}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("objectId") {
endpoint = endpoint.replace("{{objectId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("toObjectType") {
endpoint = endpoint.replace("{{toObjectType}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("GET /crm/v3/objects/{{objectType}}/{{objectId}}/associations/{{toObjectType}} failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
HubspotSendEmailActor,
inports::<100>(emailId, message, contactProperties, customProperties),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_send_email(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/marketing/v3/transactional/single-email/send".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("emailId") {
body.insert("emailId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("message") {
body.insert("message".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("contactProperties") {
body.insert("contactProperties".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("customProperties") {
body.insert("customProperties".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"POST /marketing/v3/transactional/single-email/send failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotListEmailSubscriptionsActor,
inports::<100>(trigger),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_list_email_subscriptions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/communication-preferences/v3/definitions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /communication-preferences/v3/definitions failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotReadEmailSubscriptionActor,
inports::<100>(emailAddress),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_read_email_subscription(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/communication-preferences/v3/status/email/{emailAddress}".to_string();
if let Some(val) = inputs.get("emailAddress") {
endpoint = endpoint.replace("{{emailAddress}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("GET /communication-preferences/v3/status/email/{{emailAddress}} failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
HubspotUpdateEmailSubscriptionActor,
inports::<100>(emailAddress, subscriptionId, legalBasis, legalBasisExplanation),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_update_email_subscription(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/communication-preferences/v3/subscribe".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("emailAddress") {
body.insert("emailAddress".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("subscriptionId") {
body.insert("subscriptionId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("legalBasis") {
body.insert("legalBasis".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("legalBasisExplanation") {
body.insert("legalBasisExplanation".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("POST /communication-preferences/v3/subscribe failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotListOwnersActor,
inports::<100>(email, after, limit, archived),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_list_owners(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/crm/v3/owners".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("email") {
query_pairs.push(("email", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("archived") {
query_pairs.push(("archived", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /crm/v3/owners failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotReadOwnerActor,
inports::<100>(ownerId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_read_owner(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/owners/{ownerId}".to_string();
if let Some(val) = inputs.get("ownerId") {
endpoint = endpoint.replace("{{ownerId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /crm/v3/owners/{{ownerId}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotListPropertiesActor,
inports::<100>(objectType, archived),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_list_properties(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/properties/{objectType}".to_string();
if let Some(val) = inputs.get("objectType") {
endpoint = endpoint.replace("{{objectType}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("archived") {
query_pairs.push(("archived", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /crm/v3/properties/{{objectType}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotCreatePropertyActor,
inports::<100>(objectType, name, label, type_, fieldType, groupName, options),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_create_property(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/properties/{objectType}".to_string();
if let Some(val) = inputs.get("objectType") {
endpoint = endpoint.replace("{{objectType}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("label") {
body.insert("label".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("type_") {
body.insert("type".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("fieldType") {
body.insert("fieldType".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("groupName") {
body.insert("groupName".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("options") {
body.insert("options".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("POST /crm/v3/properties/{{objectType}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotListEngagementsActor,
inports::<100>(limit, after, properties),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_list_engagements(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/crm/v3/objects/notes".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("properties") {
query_pairs.push(("properties", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /crm/v3/objects/notes failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotCreateNoteActor,
inports::<100>(properties, associations),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_create_note(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/crm/v3/objects/notes".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("properties") {
body.insert("properties".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("associations") {
body.insert("associations".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /crm/v3/objects/notes failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotCreateTaskActor,
inports::<100>(properties, associations),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_create_task(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/crm/v3/objects/tasks".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("properties") {
body.insert("properties".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("associations") {
body.insert("associations".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /crm/v3/objects/tasks failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotCreateMeetingActor,
inports::<100>(properties, associations),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_create_meeting(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/crm/v3/objects/meetings".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("properties") {
body.insert("properties".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("associations") {
body.insert("associations".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /crm/v3/objects/meetings failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotListPipelinesActor,
inports::<100>(objectType),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_list_pipelines(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/pipelines/{objectType}".to_string();
if let Some(val) = inputs.get("objectType") {
endpoint = endpoint.replace("{{objectType}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /crm/v3/pipelines/{{objectType}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotCreatePipelineActor,
inports::<100>(objectType, label, displayOrder, stages),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_create_pipeline(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/crm/v3/pipelines/{objectType}".to_string();
if let Some(val) = inputs.get("objectType") {
endpoint = endpoint.replace("{{objectType}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("label") {
body.insert("label".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("displayOrder") {
body.insert("displayOrder".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("stages") {
body.insert("stages".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("POST /crm/v3/pipelines/{{objectType}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotCreateWebhookSubscriptionActor,
inports::<100>(appId, eventType, propertyName, active),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_create_webhook_subscription(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/webhooks/v3/{appId}/subscriptions".to_string();
if let Some(val) = inputs.get("appId") {
endpoint = endpoint.replace("{{appId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("eventType") {
body.insert("eventType".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("propertyName") {
body.insert("propertyName".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("active") {
body.insert("active".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("POST /webhooks/v3/{{appId}}/subscriptions failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotListWebhookSubscriptionsActor,
inports::<100>(appId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_list_webhook_subscriptions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/webhooks/v3/{appId}/subscriptions".to_string();
if let Some(val) = inputs.get("appId") {
endpoint = endpoint.replace("{{appId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /webhooks/v3/{{appId}}/subscriptions failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotListBlogPostsActor,
inports::<100>(createdAt, archived, limit, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_list_blog_posts(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/cms/v3/blogs/posts".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("createdAt") {
query_pairs.push(("createdAt", super::message_to_str(val)));
}
if let Some(val) = inputs.get("archived") {
query_pairs.push(("archived", super::message_to_str(val)));
}
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /cms/v3/blogs/posts failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotCreateBlogPostActor,
inports::<100>(name, contentGroupId, postBody, metaDescription, slug, state),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_create_blog_post(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/cms/v3/blogs/posts".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("contentGroupId") {
body.insert("contentGroupId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("postBody") {
body.insert("postBody".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("metaDescription") {
body.insert("metaDescription".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("slug") {
body.insert("slug".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("state") {
body.insert("state".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /cms/v3/blogs/posts failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
HubspotReadBlogPostActor,
inports::<100>(objectId, archived),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_read_blog_post(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/cms/v3/blogs/posts/{objectId}".to_string();
if let Some(val) = inputs.get("objectId") {
endpoint = endpoint.replace("{{objectId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("archived") {
query_pairs.push(("archived", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /cms/v3/blogs/posts/{{objectId}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotUpdateBlogPostActor,
inports::<100>(objectId, name, postBody, state),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_update_blog_post(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/cms/v3/blogs/posts/{objectId}".to_string();
if let Some(val) = inputs.get("objectId") {
endpoint = endpoint.replace("{{objectId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("postBody") {
body.insert("postBody".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("state") {
body.insert("state".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("PATCH /cms/v3/blogs/posts/{{objectId}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
HubspotDeleteBlogPostActor,
inports::<100>(objectId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn hubspot_delete_blog_post(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/cms/v3/blogs/posts/{objectId}".to_string();
if let Some(val) = inputs.get("objectId") {
endpoint = endpoint.replace("{{objectId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("DELETE /cms/v3/blogs/posts/{{objectId}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}