#![allow(clippy::all, unused_imports, dead_code)]
use crate::{Actor, ActorBehavior, ClientBuilderExt, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::time::Duration;
const BASE_URL: &str = "https://api.segment.io/v1";
const ENV_KEY: &str = "SEGMENT_API_KEY";
fn apply_auth(
config: &reflow_actor::ActorConfig,
mut builder: reqwest::RequestBuilder,
) -> Result<reqwest::RequestBuilder> {
let credential = config
.get_config_or_env(ENV_KEY)
.ok_or_else(|| anyhow::anyhow!("Missing env var: {}", ENV_KEY))?;
builder = builder.header("Authorization", format!("Basic {}", credential));
Ok(builder)
}
#[actor(
SegmentSendEventActor,
inports::<100>(event, userId, properties),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_send_event(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/track".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("event") {
body.insert("event".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("userId") {
body.insert("userId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("properties") {
body.insert("properties".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /track failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
SegmentIdentifyUserActor,
inports::<100>(userId, traits),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_identify_user(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/identify".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("userId") {
body.insert("userId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("traits") {
body.insert("traits".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /identify failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
SegmentSendTrackEventActor,
inports::<100>(userId, anonymousId, event, properties, timestamp, context, integrations, messageId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_send_track_event(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/track".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("userId") {
body.insert("userId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("anonymousId") {
body.insert("anonymousId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("event") {
body.insert("event".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("properties") {
body.insert("properties".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("timestamp") {
body.insert("timestamp".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("context") {
body.insert("context".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("integrations") {
body.insert("integrations".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("messageId") {
body.insert("messageId".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /track failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
SegmentSendIdentifyActor,
inports::<100>(userId, anonymousId, traits, timestamp, context, integrations),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_send_identify(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/identify".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("userId") {
body.insert("userId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("anonymousId") {
body.insert("anonymousId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("traits") {
body.insert("traits".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("timestamp") {
body.insert("timestamp".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("context") {
body.insert("context".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("integrations") {
body.insert("integrations".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /identify failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
SegmentSendPageActor,
inports::<100>(userId, anonymousId, name, properties, timestamp, context, integrations),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_send_page(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/page".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("userId") {
body.insert("userId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("anonymousId") {
body.insert("anonymousId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("properties") {
body.insert("properties".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("timestamp") {
body.insert("timestamp".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("context") {
body.insert("context".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("integrations") {
body.insert("integrations".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /page failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
SegmentSendScreenActor,
inports::<100>(userId, anonymousId, name, properties, timestamp, context, integrations),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_send_screen(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/screen".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("userId") {
body.insert("userId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("anonymousId") {
body.insert("anonymousId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("properties") {
body.insert("properties".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("timestamp") {
body.insert("timestamp".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("context") {
body.insert("context".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("integrations") {
body.insert("integrations".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /screen failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
SegmentSendGroupActor,
inports::<100>(userId, anonymousId, groupId, traits, timestamp, context, integrations),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_send_group(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/group".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("userId") {
body.insert("userId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("anonymousId") {
body.insert("anonymousId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("groupId") {
body.insert("groupId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("traits") {
body.insert("traits".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("timestamp") {
body.insert("timestamp".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("context") {
body.insert("context".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("integrations") {
body.insert("integrations".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /group failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
SegmentCreateAliasActor,
inports::<100>(userId, previousId, timestamp, context, integrations),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_create_alias(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/alias".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("userId") {
body.insert("userId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("previousId") {
body.insert("previousId".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("timestamp") {
body.insert("timestamp".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("context") {
body.insert("context".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("integrations") {
body.insert("integrations".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /alias failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
SegmentSendBatchActor,
inports::<100>(batch, context, integrations),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_send_batch(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/batch".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("batch") {
body.insert("batch".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("context") {
body.insert("context".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("integrations") {
body.insert("integrations".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /batch failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
SegmentReadEventSchemaActor,
inports::<100>(source_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_read_event_schema(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/sources/{source_id}/tracking-plans".to_string();
if let Some(val) = inputs.get("source_id") {
endpoint = endpoint.replace("{{source_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /v1/sources/{{source_id}}/tracking-plans failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
SegmentListSourcesActor,
inports::<100>(workspaceSlug),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_list_sources(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/workspaces/{workspaceSlug}/sources".to_string();
if let Some(val) = inputs.get("workspaceSlug") {
endpoint = endpoint.replace("{{workspaceSlug}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /v1/workspaces/{{workspaceSlug}}/sources failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
SegmentReadSourceActor,
inports::<100>(workspaceSlug, sourceName),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_read_source(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/workspaces/{workspaceSlug}/sources/{sourceName}".to_string();
if let Some(val) = inputs.get("workspaceSlug") {
endpoint = endpoint.replace("{{workspaceSlug}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("sourceName") {
endpoint = endpoint.replace("{{sourceName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /v1/workspaces/{{workspaceSlug}}/sources/{{sourceName}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
SegmentListDestinationsActor,
inports::<100>(workspaceSlug),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_list_destinations(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/workspaces/{workspaceSlug}/destinations".to_string();
if let Some(val) = inputs.get("workspaceSlug") {
endpoint = endpoint.replace("{{workspaceSlug}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /v1/workspaces/{{workspaceSlug}}/destinations failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
SegmentReadDestinationActor,
inports::<100>(workspaceSlug, destinationName),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_read_destination(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/workspaces/{workspaceSlug}/destinations/{destinationName}".to_string();
if let Some(val) = inputs.get("workspaceSlug") {
endpoint = endpoint.replace("{{workspaceSlug}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("destinationName") {
endpoint = endpoint.replace("{{destinationName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("GET /v1/workspaces/{{workspaceSlug}}/destinations/{{destinationName}} failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
SegmentListTrackingPlansActor,
inports::<100>(workspaceSlug),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_list_tracking_plans(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/workspaces/{workspaceSlug}/tracking-plans".to_string();
if let Some(val) = inputs.get("workspaceSlug") {
endpoint = endpoint.replace("{{workspaceSlug}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /v1/workspaces/{{workspaceSlug}}/tracking-plans failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
SegmentCreateTrackingPlanActor,
inports::<100>(workspaceSlug, display_name, rules),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_create_tracking_plan(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/workspaces/{workspaceSlug}/tracking-plans".to_string();
if let Some(val) = inputs.get("workspaceSlug") {
endpoint = endpoint.replace("{{workspaceSlug}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("display_name") {
body.insert("display_name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("rules") {
body.insert("rules".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"POST /v1/workspaces/{{workspaceSlug}}/tracking-plans failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
SegmentReadTrackingPlanActor,
inports::<100>(workspaceSlug, trackingPlanId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_read_tracking_plan(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/workspaces/{workspaceSlug}/tracking-plans/{trackingPlanId}".to_string();
if let Some(val) = inputs.get("workspaceSlug") {
endpoint = endpoint.replace("{{workspaceSlug}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("trackingPlanId") {
endpoint = endpoint.replace("{{trackingPlanId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("GET /v1/workspaces/{{workspaceSlug}}/tracking-plans/{{trackingPlanId}} failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
SegmentUpdateTrackingPlanActor,
inports::<100>(workspaceSlug, trackingPlanId, display_name, rules),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_update_tracking_plan(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/workspaces/{workspaceSlug}/tracking-plans/{trackingPlanId}".to_string();
if let Some(val) = inputs.get("workspaceSlug") {
endpoint = endpoint.replace("{{workspaceSlug}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("trackingPlanId") {
endpoint = endpoint.replace("{{trackingPlanId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("display_name") {
body.insert("display_name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("rules") {
body.insert("rules".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("PUT /v1/workspaces/{{workspaceSlug}}/tracking-plans/{{trackingPlanId}} failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
SegmentDeleteTrackingPlanActor,
inports::<100>(workspaceSlug, trackingPlanId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_delete_tracking_plan(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/workspaces/{workspaceSlug}/tracking-plans/{trackingPlanId}".to_string();
if let Some(val) = inputs.get("workspaceSlug") {
endpoint = endpoint.replace("{{workspaceSlug}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("trackingPlanId") {
endpoint = endpoint.replace("{{trackingPlanId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("DELETE /v1/workspaces/{{workspaceSlug}}/tracking-plans/{{trackingPlanId}} failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
SegmentListWorkspaceUsersActor,
inports::<100>(workspaceSlug),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn segment_list_workspace_users(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/workspaces/{workspaceSlug}/members".to_string();
if let Some(val) = inputs.get("workspaceSlug") {
endpoint = endpoint.replace("{{workspaceSlug}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /v1/workspaces/{{workspaceSlug}}/members failed: {}", e).into(),
),
);
}
}
Ok(output)
}