#![allow(clippy::all, unused_imports, dead_code)]
use crate::{Actor, ActorBehavior, ClientBuilderExt, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::time::Duration;
const BASE_URL: &str = "https://analyticsreporting.googleapis.com/v4";
const ENV_KEY: &str = "GOOGLE_ANALYTICS_API_KEY";
fn apply_auth(
config: &reflow_actor::ActorConfig,
mut builder: reqwest::RequestBuilder,
) -> Result<reqwest::RequestBuilder> {
let credential = config
.get_config_or_env(ENV_KEY)
.ok_or_else(|| anyhow::anyhow!("Missing env var: {}", ENV_KEY))?;
builder = builder.header("Authorization", format!("Bearer {}", credential));
Ok(builder)
}
#[actor(
GoogleAnalyticsReadReportsActor,
inports::<100>(reportRequests),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn google_analytics_read_reports(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/reports:batchGet".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("reportRequests") {
body.insert("reportRequests".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /reports:batchGet failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
GoogleAnalyticsListAccountsActor,
inports::<100>(trigger),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn google_analytics_list_accounts(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/management/accounts".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /management/accounts failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
GoogleAnalyticsGenerateReportActor,
inports::<100>(reportRequests),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn google_analytics_generate_report(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/reports:batchGet".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("reportRequests") {
body.insert("reportRequests".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /reports:batchGet failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
GoogleAnalyticsReadAccountActor,
inports::<100>(max_results, start_index),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn google_analytics_read_account(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/management/accounts".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max-results", super::message_to_str(val)));
}
if let Some(val) = inputs.get("start_index") {
query_pairs.push(("start-index", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /management/accounts failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
GoogleAnalyticsListPropertiesActor,
inports::<100>(accountId, max_results),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn google_analytics_list_properties(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/management/accounts/{accountId}/webproperties".to_string();
if let Some(val) = inputs.get("accountId") {
endpoint = endpoint.replace("{{accountId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max-results", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /management/accounts/{{accountId}}/webproperties failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
GoogleAnalyticsListViewsActor,
inports::<100>(accountId, webPropertyId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn google_analytics_list_views(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint =
"/management/accounts/{accountId}/webproperties/{webPropertyId}/profiles".to_string();
if let Some(val) = inputs.get("accountId") {
endpoint = endpoint.replace("{{accountId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("webPropertyId") {
endpoint = endpoint.replace("{{webPropertyId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("GET /management/accounts/{{accountId}}/webproperties/{{webPropertyId}}/profiles failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
GoogleAnalyticsListGoalsActor,
inports::<100>(accountId, webPropertyId, profileId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn google_analytics_list_goals(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint =
"/management/accounts/{accountId}/webproperties/{webPropertyId}/profiles/{profileId}/goals"
.to_string();
if let Some(val) = inputs.get("accountId") {
endpoint = endpoint.replace("{{accountId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("webPropertyId") {
endpoint = endpoint.replace("{{webPropertyId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("profileId") {
endpoint = endpoint.replace("{{profileId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("GET /management/accounts/{{accountId}}/webproperties/{{webPropertyId}}/profiles/{{profileId}}/goals failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
GoogleAnalyticsListSegmentsActor,
inports::<100>(max_results, start_index),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn google_analytics_list_segments(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/management/segments".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max-results", super::message_to_str(val)));
}
if let Some(val) = inputs.get("start_index") {
query_pairs.push(("start-index", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /management/segments failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
GoogleAnalyticsAnalyzeRealtimeReportActor,
inports::<100>(ids, metrics, dimensions, filters, max_results),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn google_analytics_analyze_realtime_report(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/data/realtime".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("ids") {
query_pairs.push(("ids", super::message_to_str(val)));
}
if let Some(val) = inputs.get("metrics") {
query_pairs.push(("metrics", super::message_to_str(val)));
}
if let Some(val) = inputs.get("dimensions") {
query_pairs.push(("dimensions", super::message_to_str(val)));
}
if let Some(val) = inputs.get("filters") {
query_pairs.push(("filters", super::message_to_str(val)));
}
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max-results", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /data/realtime failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
GoogleAnalyticsAnalyzeGaReportActor,
inports::<100>(ids, start_date, end_date, metrics, dimensions, sort, filters, segment, max_results, start_index),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn google_analytics_analyze_ga_report(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/data/ga".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("ids") {
query_pairs.push(("ids", super::message_to_str(val)));
}
if let Some(val) = inputs.get("start_date") {
query_pairs.push(("start-date", super::message_to_str(val)));
}
if let Some(val) = inputs.get("end_date") {
query_pairs.push(("end-date", super::message_to_str(val)));
}
if let Some(val) = inputs.get("metrics") {
query_pairs.push(("metrics", super::message_to_str(val)));
}
if let Some(val) = inputs.get("dimensions") {
query_pairs.push(("dimensions", super::message_to_str(val)));
}
if let Some(val) = inputs.get("sort") {
query_pairs.push(("sort", super::message_to_str(val)));
}
if let Some(val) = inputs.get("filters") {
query_pairs.push(("filters", super::message_to_str(val)));
}
if let Some(val) = inputs.get("segment") {
query_pairs.push(("segment", super::message_to_str(val)));
}
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max-results", super::message_to_str(val)));
}
if let Some(val) = inputs.get("start_index") {
query_pairs.push(("start-index", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /data/ga failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
GoogleAnalyticsListCustomDimensionsActor,
inports::<100>(accountId, webPropertyId, max_results),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn google_analytics_list_custom_dimensions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint =
"/management/accounts/{accountId}/webproperties/{webPropertyId}/customDimensions"
.to_string();
if let Some(val) = inputs.get("accountId") {
endpoint = endpoint.replace("{{accountId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("webPropertyId") {
endpoint = endpoint.replace("{{webPropertyId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max-results", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("GET /management/accounts/{{accountId}}/webproperties/{{webPropertyId}}/customDimensions failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
GoogleAnalyticsListCustomMetricsActor,
inports::<100>(accountId, webPropertyId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn google_analytics_list_custom_metrics(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint =
"/management/accounts/{accountId}/webproperties/{webPropertyId}/customMetrics".to_string();
if let Some(val) = inputs.get("accountId") {
endpoint = endpoint.replace("{{accountId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("webPropertyId") {
endpoint = endpoint.replace("{{webPropertyId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("GET /management/accounts/{{accountId}}/webproperties/{{webPropertyId}}/customMetrics failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
GoogleAnalyticsListFiltersActor,
inports::<100>(accountId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn google_analytics_list_filters(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/management/accounts/{accountId}/filters".to_string();
if let Some(val) = inputs.get("accountId") {
endpoint = endpoint.replace("{{accountId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /management/accounts/{{accountId}}/filters failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
GoogleAnalyticsListAccountSummariesActor,
inports::<100>(max_results, start_index),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn google_analytics_list_account_summaries(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/management/accountSummaries".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max-results", super::message_to_str(val)));
}
if let Some(val) = inputs.get("start_index") {
query_pairs.push(("start-index", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /management/accountSummaries failed: {}", e).into()),
);
}
}
Ok(output)
}