#![allow(clippy::all, unused_imports, dead_code)]
use crate::{Actor, ActorBehavior, ClientBuilderExt, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::time::Duration;
const BASE_URL: &str = "https://api.twitch.tv/helix";
const ENV_KEY: &str = "TWITCH_API_KEY";
fn apply_auth(
config: &reflow_actor::ActorConfig,
mut builder: reqwest::RequestBuilder,
) -> Result<reqwest::RequestBuilder> {
let credential = config
.get_config_or_env(ENV_KEY)
.ok_or_else(|| anyhow::anyhow!("Missing env var: {}", ENV_KEY))?;
builder = builder.header("Authorization", format!("Bearer {}", credential));
Ok(builder)
}
#[actor(
TwitchReadUsersActor,
inports::<100>(id, login),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_users(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/users".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("login") {
query_pairs.push(("login", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /users failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchListStreamsActor,
inports::<100>(game_id, language, first, user_id, user_login, type_, before, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_list_streams(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/streams".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("game_id") {
query_pairs.push(("game_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_login") {
query_pairs.push(("user_login", super::message_to_str(val)));
}
if let Some(val) = inputs.get("type_") {
query_pairs.push(("type", super::message_to_str(val)));
}
if let Some(val) = inputs.get("before") {
query_pairs.push(("before", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /streams failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadExtensionAnalyticsActor,
inports::<100>(extension_id, type_, started_at, ended_at, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_extension_analytics(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/analytics/extensions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("extension_id") {
query_pairs.push(("extension_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("type_") {
query_pairs.push(("type", super::message_to_str(val)));
}
if let Some(val) = inputs.get("started_at") {
query_pairs.push(("started_at", super::message_to_str(val)));
}
if let Some(val) = inputs.get("ended_at") {
query_pairs.push(("ended_at", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /analytics/extensions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadGameAnalyticsActor,
inports::<100>(game_id, type_, started_at, ended_at, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_game_analytics(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/analytics/games".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("game_id") {
query_pairs.push(("game_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("type_") {
query_pairs.push(("type", super::message_to_str(val)));
}
if let Some(val) = inputs.get("started_at") {
query_pairs.push(("started_at", super::message_to_str(val)));
}
if let Some(val) = inputs.get("ended_at") {
query_pairs.push(("ended_at", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /analytics/games failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadAuthorizationByUserActor,
inports::<100>(user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_authorization_by_user(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/authorization/users".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /authorization/users failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadCheermotesActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_cheermotes(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/bits/cheermotes".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /bits/cheermotes failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateExtensionBitsProductActor,
inports::<100>(cost, expiration, is_broadcast, display_name, sku, in_development),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_extension_bits_product(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/bits/extensions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("cost") {
body.insert("cost".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("expiration") {
body.insert("expiration".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("is_broadcast") {
body.insert("is_broadcast".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("display_name") {
body.insert("display_name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("sku") {
body.insert("sku".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("in_development") {
body.insert("in_development".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PUT /bits/extensions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadExtensionBitsProductsActor,
inports::<100>(should_include_all),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_extension_bits_products(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/bits/extensions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("should_include_all") {
query_pairs.push(("should_include_all", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /bits/extensions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadBitsLeaderboardActor,
inports::<100>(count, period, started_at, user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_bits_leaderboard(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/bits/leaderboard".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("count") {
query_pairs.push(("count", super::message_to_str(val)));
}
if let Some(val) = inputs.get("period") {
query_pairs.push(("period", super::message_to_str(val)));
}
if let Some(val) = inputs.get("started_at") {
query_pairs.push(("started_at", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /bits/leaderboard failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadCustomRewardActor,
inports::<100>(broadcaster_id, id, only_manageable_rewards),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_custom_reward(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channel_points/custom_rewards".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("only_manageable_rewards") {
query_pairs.push(("only_manageable_rewards", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /channel_points/custom_rewards failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchCreateCustomRewardsActor,
inports::<100>(broadcaster_id, max_per_user_per_stream, prompt, should_redemptions_skip_request_queue, title, is_max_per_stream_enabled, cost, global_cooldown_seconds, is_enabled, is_max_per_user_per_stream_enabled, is_user_input_required, background_color, max_per_stream, is_global_cooldown_enabled),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_create_custom_rewards(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channel_points/custom_rewards".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("max_per_user_per_stream") {
body.insert("max_per_user_per_stream".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("prompt") {
body.insert("prompt".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("should_redemptions_skip_request_queue") {
body.insert(
"should_redemptions_skip_request_queue".to_string(),
val.clone().into(),
);
}
if let Some(val) = inputs.get("title") {
body.insert("title".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("is_max_per_stream_enabled") {
body.insert("is_max_per_stream_enabled".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("cost") {
body.insert("cost".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("global_cooldown_seconds") {
body.insert("global_cooldown_seconds".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("is_enabled") {
body.insert("is_enabled".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("is_max_per_user_per_stream_enabled") {
body.insert(
"is_max_per_user_per_stream_enabled".to_string(),
val.clone().into(),
);
}
if let Some(val) = inputs.get("is_user_input_required") {
body.insert("is_user_input_required".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("background_color") {
body.insert("background_color".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("max_per_stream") {
body.insert("max_per_stream".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("is_global_cooldown_enabled") {
body.insert("is_global_cooldown_enabled".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /channel_points/custom_rewards failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchDeleteCustomRewardActor,
inports::<100>(broadcaster_id, id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_delete_custom_reward(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channel_points/custom_rewards".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("DELETE /channel_points/custom_rewards failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateCustomRewardActor,
inports::<100>(broadcaster_id, id, is_max_per_stream_enabled, global_cooldown_seconds, cost, background_color, is_max_per_user_per_stream_enabled, is_enabled, is_global_cooldown_enabled, is_paused, max_per_stream, prompt, should_redemptions_skip_request_queue, title, is_user_input_required, max_per_user_per_stream),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_custom_reward(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channel_points/custom_rewards".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("is_max_per_stream_enabled") {
body.insert("is_max_per_stream_enabled".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("global_cooldown_seconds") {
body.insert("global_cooldown_seconds".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("cost") {
body.insert("cost".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("background_color") {
body.insert("background_color".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("is_max_per_user_per_stream_enabled") {
body.insert(
"is_max_per_user_per_stream_enabled".to_string(),
val.clone().into(),
);
}
if let Some(val) = inputs.get("is_enabled") {
body.insert("is_enabled".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("is_global_cooldown_enabled") {
body.insert("is_global_cooldown_enabled".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("is_paused") {
body.insert("is_paused".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("max_per_stream") {
body.insert("max_per_stream".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("prompt") {
body.insert("prompt".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("should_redemptions_skip_request_queue") {
body.insert(
"should_redemptions_skip_request_queue".to_string(),
val.clone().into(),
);
}
if let Some(val) = inputs.get("title") {
body.insert("title".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("is_user_input_required") {
body.insert("is_user_input_required".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("max_per_user_per_stream") {
body.insert("max_per_user_per_stream".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("PATCH /channel_points/custom_rewards failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
TwitchReadCustomRewardRedemptionActor,
inports::<100>(broadcaster_id, reward_id, status, id, sort, after, first),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_custom_reward_redemption(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channel_points/custom_rewards/redemptions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("reward_id") {
query_pairs.push(("reward_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("status") {
query_pairs.push(("status", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("sort") {
query_pairs.push(("sort", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /channel_points/custom_rewards/redemptions failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateRedemptionStatusActor,
inports::<100>(id, broadcaster_id, reward_id, status),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_redemption_status(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channel_points/custom_rewards/redemptions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("reward_id") {
query_pairs.push(("reward_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("status") {
body.insert("status".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"PATCH /channel_points/custom_rewards/redemptions failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
TwitchReadChannelInformationActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_channel_information(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channels".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /channels failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateChannelsActor,
inports::<100>(broadcaster_id, game_id, delay, broadcaster_language, tags, content_classification_labels, is_branded_content, title),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_channels(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channels".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("game_id") {
body.insert("game_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("delay") {
body.insert("delay".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("broadcaster_language") {
body.insert("broadcaster_language".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("tags") {
body.insert("tags".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("content_classification_labels") {
body.insert(
"content_classification_labels".to_string(),
val.clone().into(),
);
}
if let Some(val) = inputs.get("is_branded_content") {
body.insert("is_branded_content".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("title") {
body.insert("title".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PATCH /channels failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadAdScheduleActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_ad_schedule(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channels/ads".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /channels/ads failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchCreateAdsActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_create_ads(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channels/ads/schedule/snooze".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /channels/ads/schedule/snooze failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchStartCommercialActor,
inports::<100>(length, broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_start_commercial(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channels/commercial".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("length") {
body.insert("length".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("broadcaster_id") {
body.insert("broadcaster_id".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /channels/commercial failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadChannelEditorsActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_channel_editors(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channels/editors".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /channels/editors failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadFollowedChannelsActor,
inports::<100>(user_id, broadcaster_id, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_followed_channels(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channels/followed".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /channels/followed failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadChannelFollowersActor,
inports::<100>(user_id, broadcaster_id, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_channel_followers(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channels/followers".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /channels/followers failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchCreateModerationActor,
inports::<100>(user_id, broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_create_moderation(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channels/vips".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /channels/vips failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadVipsActor,
inports::<100>(user_id, broadcaster_id, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_vips(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channels/vips".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /channels/vips failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchDeleteChannelVipActor,
inports::<100>(user_id, broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_delete_channel_vip(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/channels/vips".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /channels/vips failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadCharityCampaignActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_charity_campaign(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/charity/campaigns".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /charity/campaigns failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadCharityCampaignDonationsActor,
inports::<100>(broadcaster_id, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_charity_campaign_donations(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/charity/donations".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /charity/donations failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchSendChatAnnouncementActor,
inports::<100>(broadcaster_id, moderator_id, color, message),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_send_chat_announcement(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/announcements".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("color") {
body.insert("color".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("message") {
body.insert("message".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /chat/announcements failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadChannelChatBadgesActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_channel_chat_badges(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/badges".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /chat/badges failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadGlobalChatBadgesActor,
inports::<100>(trigger),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_global_chat_badges(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/badges/global".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /chat/badges/global failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadChattersActor,
inports::<100>(broadcaster_id, moderator_id, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_chatters(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/chatters".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /chat/chatters failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadUserChatColorActor,
inports::<100>(user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_user_chat_color(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/color".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /chat/color failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateUserChatColorActor,
inports::<100>(user_id, color),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_user_chat_color(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/color".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("color") {
query_pairs.push(("color", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PUT /chat/color failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadChannelEmotesActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_channel_emotes(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/emotes".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /chat/emotes failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadGlobalEmotesActor,
inports::<100>(trigger),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_global_emotes(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/emotes/global".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /chat/emotes/global failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadEmoteSetsActor,
inports::<100>(emote_set_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_emote_sets(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/emotes/set".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("emote_set_id") {
query_pairs.push(("emote_set_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /chat/emotes/set failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadUserEmotesActor,
inports::<100>(user_id, after, broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_user_emotes(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/emotes/user".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /chat/emotes/user failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchSendChatMessageActor,
inports::<100>(sender_id, reply_parent_message_id, for_source_only, broadcaster_id, message),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_send_chat_message(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/messages".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("sender_id") {
body.insert("sender_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("reply_parent_message_id") {
body.insert("reply_parent_message_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("for_source_only") {
body.insert("for_source_only".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("broadcaster_id") {
body.insert("broadcaster_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("message") {
body.insert("message".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /chat/messages failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadChatSettingsActor,
inports::<100>(broadcaster_id, moderator_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_chat_settings(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/settings".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /chat/settings failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateChatSettingsActor,
inports::<100>(broadcaster_id, moderator_id, slow_mode_wait_time, emote_mode, non_moderator_chat_delay_duration, subscriber_mode, follower_mode, unique_chat_mode, non_moderator_chat_delay, slow_mode, follower_mode_duration),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_chat_settings(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/settings".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("slow_mode_wait_time") {
body.insert("slow_mode_wait_time".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("emote_mode") {
body.insert("emote_mode".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("non_moderator_chat_delay_duration") {
body.insert(
"non_moderator_chat_delay_duration".to_string(),
val.clone().into(),
);
}
if let Some(val) = inputs.get("subscriber_mode") {
body.insert("subscriber_mode".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("follower_mode") {
body.insert("follower_mode".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("unique_chat_mode") {
body.insert("unique_chat_mode".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("non_moderator_chat_delay") {
body.insert("non_moderator_chat_delay".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("slow_mode") {
body.insert("slow_mode".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("follower_mode_duration") {
body.insert("follower_mode_duration".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PATCH /chat/settings failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchSendAShoutoutActor,
inports::<100>(from_broadcaster_id, to_broadcaster_id, moderator_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_send_a_shoutout(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/shoutouts".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("from_broadcaster_id") {
query_pairs.push(("from_broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("to_broadcaster_id") {
query_pairs.push(("to_broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /chat/shoutouts failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchCreateClipActor,
inports::<100>(broadcaster_id, title, duration),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_create_clip(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/clips".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("title") {
query_pairs.push(("title", super::message_to_str(val)));
}
if let Some(val) = inputs.get("duration") {
query_pairs.push(("duration", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /clips failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadClipsActor,
inports::<100>(broadcaster_id, game_id, id, started_at, ended_at, first, before, after, is_featured),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_clips(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/clips".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("game_id") {
query_pairs.push(("game_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("started_at") {
query_pairs.push(("started_at", super::message_to_str(val)));
}
if let Some(val) = inputs.get("ended_at") {
query_pairs.push(("ended_at", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("before") {
query_pairs.push(("before", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("is_featured") {
query_pairs.push(("is_featured", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /clips failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadClipsDownloadActor,
inports::<100>(editor_id, broadcaster_id, clip_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_clips_download(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/clips/downloads".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("editor_id") {
query_pairs.push(("editor_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("clip_id") {
query_pairs.push(("clip_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /clips/downloads failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadContentClassificationLabelsActor,
inports::<100>(locale),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_content_classification_labels(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/content_classification_labels".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("locale") {
query_pairs.push(("locale", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /content_classification_labels failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadDropsEntitlementsActor,
inports::<100>(id, user_id, game_id, fulfillment_status, after, first),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_drops_entitlements(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/entitlements/drops".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("game_id") {
query_pairs.push(("game_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("fulfillment_status") {
query_pairs.push(("fulfillment_status", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /entitlements/drops failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateDropsEntitlementsActor,
inports::<100>(entitlement_ids, fulfillment_status),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_drops_entitlements(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/entitlements/drops".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("entitlement_ids") {
body.insert("entitlement_ids".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("fulfillment_status") {
body.insert("fulfillment_status".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PATCH /entitlements/drops failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadConduitsActor,
inports::<100>(trigger),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_conduits(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/eventsub/conduits".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /eventsub/conduits failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchDeleteConduitActor,
inports::<100>(id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_delete_conduit(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/eventsub/conduits".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /eventsub/conduits failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchCreateConduitsActor,
inports::<100>(shard_count),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_create_conduits(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/eventsub/conduits".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("shard_count") {
body.insert("shard_count".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /eventsub/conduits failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateConduitsActor,
inports::<100>(id, shard_count),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_conduits(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/eventsub/conduits".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("id") {
body.insert("id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("shard_count") {
body.insert("shard_count".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PATCH /eventsub/conduits failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateConduitShardsActor,
inports::<100>(conduit_id, shards),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_conduit_shards(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/eventsub/conduits/shards".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("conduit_id") {
body.insert("conduit_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("shards") {
body.insert("shards".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PATCH /eventsub/conduits/shards failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadConduitShardsActor,
inports::<100>(conduit_id, status, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_conduit_shards(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/eventsub/conduits/shards".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("conduit_id") {
query_pairs.push(("conduit_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("status") {
query_pairs.push(("status", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /eventsub/conduits/shards failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadEventsubSubscriptionsActor,
inports::<100>(status, type_, user_id, subscription_id, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_eventsub_subscriptions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/eventsub/subscriptions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("status") {
query_pairs.push(("status", super::message_to_str(val)));
}
if let Some(val) = inputs.get("type_") {
query_pairs.push(("type", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("subscription_id") {
query_pairs.push(("subscription_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /eventsub/subscriptions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchCreateEventsubSubscriptionActor,
inports::<100>(transport, condition, type_, version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_create_eventsub_subscription(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/eventsub/subscriptions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("transport") {
body.insert("transport".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("condition") {
body.insert("condition".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("type_") {
body.insert("type".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("version") {
body.insert("version".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /eventsub/subscriptions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchDeleteEventsubSubscriptionActor,
inports::<100>(id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_delete_eventsub_subscription(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/eventsub/subscriptions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /eventsub/subscriptions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadExtensionsActor,
inports::<100>(extension_id, extension_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_extensions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/extensions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("extension_id") {
query_pairs.push(("extension_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("extension_version") {
query_pairs.push(("extension_version", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /extensions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchSendExtensionChatMessageActor,
inports::<100>(broadcaster_id, extension_id, extension_version, text),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_send_extension_chat_message(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/extensions/chat".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("extension_id") {
body.insert("extension_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("extension_version") {
body.insert("extension_version".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("text") {
body.insert("text".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /extensions/chat failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadExtensionConfigurationSegmentActor,
inports::<100>(broadcaster_id, extension_id, segment),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_extension_configuration_segment(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/extensions/configurations".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("extension_id") {
query_pairs.push(("extension_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("segment") {
query_pairs.push(("segment", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /extensions/configurations failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateExtensionsActor,
inports::<100>(content, segment, broadcaster_id, extension_id, version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_extensions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/extensions/configurations".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("content") {
body.insert("content".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("segment") {
body.insert("segment".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("broadcaster_id") {
body.insert("broadcaster_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("extension_id") {
body.insert("extension_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("version") {
body.insert("version".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PUT /extensions/configurations failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchCreateExtensionSecretActor,
inports::<100>(extension_id, delay),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_create_extension_secret(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/extensions/jwt/secrets".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("extension_id") {
query_pairs.push(("extension_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("delay") {
query_pairs.push(("delay", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /extensions/jwt/secrets failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadExtensionSecretsActor,
inports::<100>(trigger),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_extension_secrets(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/extensions/jwt/secrets".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /extensions/jwt/secrets failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadExtensionLiveChannelsActor,
inports::<100>(extension_id, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_extension_live_channels(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/extensions/live".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("extension_id") {
query_pairs.push(("extension_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /extensions/live failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchSendExtensionPubsubMessageActor,
inports::<100>(target, message, broadcaster_id, is_global_broadcast),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_send_extension_pubsub_message(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/extensions/pubsub".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("target") {
body.insert("target".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("message") {
body.insert("message".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("broadcaster_id") {
body.insert("broadcaster_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("is_global_broadcast") {
body.insert("is_global_broadcast".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /extensions/pubsub failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadReleasedExtensionsActor,
inports::<100>(extension_id, extension_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_released_extensions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/extensions/released".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("extension_id") {
query_pairs.push(("extension_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("extension_version") {
query_pairs.push(("extension_version", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /extensions/released failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadExtensionTransactionsActor,
inports::<100>(extension_id, id, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_extension_transactions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/extensions/transactions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("extension_id") {
query_pairs.push(("extension_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /extensions/transactions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadGamesActor,
inports::<100>(id, name, igdb_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_games(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/games".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("name") {
query_pairs.push(("name", super::message_to_str(val)));
}
if let Some(val) = inputs.get("igdb_id") {
query_pairs.push(("igdb_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /games failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadTopGamesActor,
inports::<100>(first, after, before),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_top_games(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/games/top".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("before") {
query_pairs.push(("before", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /games/top failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadCreatorGoalsActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_creator_goals(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/goals".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /goals failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadChannelGuestStarSettingsActor,
inports::<100>(broadcaster_id, moderator_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_channel_guest_star_settings(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/guest_star/channel_settings".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /guest_star/channel_settings failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateChannelGuestStarSettingsActor,
inports::<100>(broadcaster_id, group_layout, is_browser_source_audio_enabled, regenerate_browser_sources, is_moderator_send_live_enabled, slot_count),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_channel_guest_star_settings(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/guest_star/channel_settings".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("group_layout") {
body.insert("group_layout".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("is_browser_source_audio_enabled") {
body.insert(
"is_browser_source_audio_enabled".to_string(),
val.clone().into(),
);
}
if let Some(val) = inputs.get("regenerate_browser_sources") {
body.insert("regenerate_browser_sources".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("is_moderator_send_live_enabled") {
body.insert(
"is_moderator_send_live_enabled".to_string(),
val.clone().into(),
);
}
if let Some(val) = inputs.get("slot_count") {
body.insert("slot_count".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PUT /guest_star/channel_settings failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchSendGuestStarInviteActor,
inports::<100>(broadcaster_id, moderator_id, session_id, guest_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_send_guest_star_invite(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/guest_star/invites".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("session_id") {
query_pairs.push(("session_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("guest_id") {
query_pairs.push(("guest_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /guest_star/invites failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadGuestStarInvitesActor,
inports::<100>(broadcaster_id, moderator_id, session_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_guest_star_invites(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/guest_star/invites".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("session_id") {
query_pairs.push(("session_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /guest_star/invites failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchDeleteGuestStarInviteActor,
inports::<100>(broadcaster_id, moderator_id, session_id, guest_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_delete_guest_star_invite(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/guest_star/invites".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("session_id") {
query_pairs.push(("session_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("guest_id") {
query_pairs.push(("guest_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /guest_star/invites failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchDeleteGuestStarActor,
inports::<100>(broadcaster_id, session_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_delete_guest_star(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/guest_star/session".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("session_id") {
query_pairs.push(("session_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /guest_star/session failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadGuestStarSessionActor,
inports::<100>(broadcaster_id, moderator_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_guest_star_session(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/guest_star/session".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /guest_star/session failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchCreateGuestStarSessionActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_create_guest_star_session(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/guest_star/session".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /guest_star/session failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchDeleteGuestStarSlotActor,
inports::<100>(broadcaster_id, moderator_id, session_id, guest_id, slot_id, should_reinvite_guest),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_delete_guest_star_slot(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/guest_star/slot".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("session_id") {
query_pairs.push(("session_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("guest_id") {
query_pairs.push(("guest_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("slot_id") {
query_pairs.push(("slot_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("should_reinvite_guest") {
query_pairs.push(("should_reinvite_guest", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /guest_star/slot failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateGuestStarSlotActor,
inports::<100>(broadcaster_id, moderator_id, session_id, source_slot_id, destination_slot_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_guest_star_slot(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/guest_star/slot".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("session_id") {
query_pairs.push(("session_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("source_slot_id") {
query_pairs.push(("source_slot_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("destination_slot_id") {
query_pairs.push(("destination_slot_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PATCH /guest_star/slot failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchCreateGuestStarActor,
inports::<100>(broadcaster_id, moderator_id, session_id, guest_id, slot_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_create_guest_star(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/guest_star/slot".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("session_id") {
query_pairs.push(("session_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("guest_id") {
query_pairs.push(("guest_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("slot_id") {
query_pairs.push(("slot_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /guest_star/slot failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateGuestStarSlotSettingsActor,
inports::<100>(broadcaster_id, moderator_id, session_id, slot_id, is_audio_enabled, is_video_enabled, is_live, volume),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_guest_star_slot_settings(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/guest_star/slot_settings".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("session_id") {
query_pairs.push(("session_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("slot_id") {
query_pairs.push(("slot_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("is_audio_enabled") {
query_pairs.push(("is_audio_enabled", super::message_to_str(val)));
}
if let Some(val) = inputs.get("is_video_enabled") {
query_pairs.push(("is_video_enabled", super::message_to_str(val)));
}
if let Some(val) = inputs.get("is_live") {
query_pairs.push(("is_live", super::message_to_str(val)));
}
if let Some(val) = inputs.get("volume") {
query_pairs.push(("volume", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PATCH /guest_star/slot_settings failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadHypeTrainStatusActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_hype_train_status(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/hypetrain/status".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /hypetrain/status failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateAutomodSettingsActor,
inports::<100>(broadcaster_id, moderator_id, sex_based_terms, sexuality_sex_or_gender, bullying, aggression, disability, race_ethnicity_or_religion, misogyny, overall_level, swearing),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_automod_settings(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderation/automod/settings".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("sex_based_terms") {
body.insert("sex_based_terms".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("sexuality_sex_or_gender") {
body.insert("sexuality_sex_or_gender".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("bullying") {
body.insert("bullying".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("aggression") {
body.insert("aggression".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("disability") {
body.insert("disability".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("race_ethnicity_or_religion") {
body.insert("race_ethnicity_or_religion".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("misogyny") {
body.insert("misogyny".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("overall_level") {
body.insert("overall_level".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("swearing") {
body.insert("swearing".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PUT /moderation/automod/settings failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadAutomodSettingsActor,
inports::<100>(broadcaster_id, moderator_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_automod_settings(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderation/automod/settings".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /moderation/automod/settings failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadBannedUsersActor,
inports::<100>(broadcaster_id, user_id, first, after, before),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_banned_users(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderation/banned".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("before") {
query_pairs.push(("before", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /moderation/banned failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchDeleteModerationActor,
inports::<100>(broadcaster_id, moderator_id, user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_delete_moderation(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderation/bans".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /moderation/bans failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchDeleteBlockedTermActor,
inports::<100>(broadcaster_id, moderator_id, id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_delete_blocked_term(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderation/blocked_terms".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /moderation/blocked_terms failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadBlockedTermsActor,
inports::<100>(broadcaster_id, moderator_id, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_blocked_terms(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderation/blocked_terms".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /moderation/blocked_terms failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadModeratedChannelsActor,
inports::<100>(user_id, after, first),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_moderated_channels(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderation/channels".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /moderation/channels failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchDeleteChatMessagesActor,
inports::<100>(broadcaster_id, moderator_id, message_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_delete_chat_messages(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderation/chat".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("message_id") {
query_pairs.push(("message_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /moderation/chat failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadModeratorsActor,
inports::<100>(broadcaster_id, user_id, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_moderators(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderation/moderators".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /moderation/moderators failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchDeleteChannelModeratorActor,
inports::<100>(broadcaster_id, user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_delete_channel_moderator(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderation/moderators".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /moderation/moderators failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadShieldModeStatusActor,
inports::<100>(broadcaster_id, moderator_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_shield_mode_status(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderation/shield_mode".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /moderation/shield_mode failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateShieldModeStatusActor,
inports::<100>(broadcaster_id, moderator_id, is_active),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_shield_mode_status(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderation/shield_mode".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("is_active") {
body.insert("is_active".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PUT /moderation/shield_mode failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchDeleteSuspiciousStatusFromChatUserActor,
inports::<100>(broadcaster_id, moderator_id, user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_delete_suspicious_status_from_chat_user(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderation/suspicious_users".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /moderation/suspicious_users failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateModerationActor,
inports::<100>(broadcaster_id, moderator_id, unban_request_id, status, resolution_text),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_moderation(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderation/unban_requests".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("unban_request_id") {
query_pairs.push(("unban_request_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("status") {
query_pairs.push(("status", super::message_to_str(val)));
}
if let Some(val) = inputs.get("resolution_text") {
query_pairs.push(("resolution_text", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PATCH /moderation/unban_requests failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadUnbanRequestsActor,
inports::<100>(broadcaster_id, moderator_id, status, user_id, after, first),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_unban_requests(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderation/unban_requests".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("moderator_id") {
query_pairs.push(("moderator_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("status") {
query_pairs.push(("status", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /moderation/unban_requests failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchCreatePollActor,
inports::<100>(title, duration, broadcaster_id, channel_points_voting_enabled, choices, channel_points_per_vote),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_create_poll(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/polls".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("title") {
body.insert("title".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("duration") {
body.insert("duration".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("broadcaster_id") {
body.insert("broadcaster_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("channel_points_voting_enabled") {
body.insert(
"channel_points_voting_enabled".to_string(),
val.clone().into(),
);
}
if let Some(val) = inputs.get("choices") {
body.insert("choices".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("channel_points_per_vote") {
body.insert("channel_points_per_vote".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /polls failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadPollsActor,
inports::<100>(broadcaster_id, id, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_polls(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/polls".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /polls failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdatePollsActor,
inports::<100>(id, status, broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_polls(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/polls".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("id") {
body.insert("id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("status") {
body.insert("status".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("broadcaster_id") {
body.insert("broadcaster_id".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PATCH /polls failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdatePredictionsActor,
inports::<100>(winning_outcome_id, broadcaster_id, id, status),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_predictions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/predictions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("winning_outcome_id") {
body.insert("winning_outcome_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("broadcaster_id") {
body.insert("broadcaster_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("id") {
body.insert("id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("status") {
body.insert("status".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PATCH /predictions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadPredictionsActor,
inports::<100>(broadcaster_id, id, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_predictions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/predictions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /predictions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchCreatePredictionActor,
inports::<100>(broadcaster_id, outcomes, prediction_window, title),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_create_prediction(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/predictions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("broadcaster_id") {
body.insert("broadcaster_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("outcomes") {
body.insert("outcomes".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("prediction_window") {
body.insert("prediction_window".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("title") {
body.insert("title".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /predictions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchCancelARaidActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_cancel_a_raid(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/raids".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /raids failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchStartARaidActor,
inports::<100>(from_broadcaster_id, to_broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_start_a_raid(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/raids".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("from_broadcaster_id") {
query_pairs.push(("from_broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("to_broadcaster_id") {
query_pairs.push(("to_broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /raids failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadChannelStreamScheduleActor,
inports::<100>(broadcaster_id, id, start_time, utc_offset, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_channel_stream_schedule(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/schedule".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("start_time") {
query_pairs.push(("start_time", super::message_to_str(val)));
}
if let Some(val) = inputs.get("utc_offset") {
query_pairs.push(("utc_offset", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /schedule failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadChannelIcalendarActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_channel_icalendar(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/schedule/icalendar".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /schedule/icalendar failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateChannelStreamScheduleSegmentActor,
inports::<100>(broadcaster_id, id, start_time, category_id, is_canceled, duration, timezone, title),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_channel_stream_schedule_segment(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/schedule/segment".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("start_time") {
body.insert("start_time".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("category_id") {
body.insert("category_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("is_canceled") {
body.insert("is_canceled".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("duration") {
body.insert("duration".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("timezone") {
body.insert("timezone".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("title") {
body.insert("title".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PATCH /schedule/segment failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchDeleteChannelStreamScheduleSegmentActor,
inports::<100>(broadcaster_id, id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_delete_channel_stream_schedule_segment(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/schedule/segment".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /schedule/segment failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchCreateChannelStreamScheduleSegmentActor,
inports::<100>(broadcaster_id, is_recurring, timezone, title, category_id, start_time, duration),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_create_channel_stream_schedule_segment(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/schedule/segment".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("is_recurring") {
body.insert("is_recurring".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("timezone") {
body.insert("timezone".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("title") {
body.insert("title".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("category_id") {
body.insert("category_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("start_time") {
body.insert("start_time".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("duration") {
body.insert("duration".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /schedule/segment failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateChannelStreamScheduleActor,
inports::<100>(broadcaster_id, is_vacation_enabled, vacation_start_time, vacation_end_time, timezone),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_channel_stream_schedule(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/schedule/settings".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("is_vacation_enabled") {
query_pairs.push(("is_vacation_enabled", super::message_to_str(val)));
}
if let Some(val) = inputs.get("vacation_start_time") {
query_pairs.push(("vacation_start_time", super::message_to_str(val)));
}
if let Some(val) = inputs.get("vacation_end_time") {
query_pairs.push(("vacation_end_time", super::message_to_str(val)));
}
if let Some(val) = inputs.get("timezone") {
query_pairs.push(("timezone", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PATCH /schedule/settings failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchSearchCategoriesActor,
inports::<100>(query, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_search_categories(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/search/categories".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("query") {
query_pairs.push(("query", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /search/categories failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchSearchChannelsActor,
inports::<100>(query, live_only, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_search_channels(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/search/channels".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("query") {
query_pairs.push(("query", super::message_to_str(val)));
}
if let Some(val) = inputs.get("live_only") {
query_pairs.push(("live_only", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /search/channels failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadSharedChatSessionActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_shared_chat_session(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/shared_chat/session".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /shared_chat/session failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadStreamsActor,
inports::<100>(user_id, user_login, game_id, type_, language, first, before, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_streams(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/streams".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_login") {
query_pairs.push(("user_login", super::message_to_str(val)));
}
if let Some(val) = inputs.get("game_id") {
query_pairs.push(("game_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("type_") {
query_pairs.push(("type", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("before") {
query_pairs.push(("before", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /streams failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadFollowedStreamsActor,
inports::<100>(user_id, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_followed_streams(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/streams/followed".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /streams/followed failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadStreamKeyActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_stream_key(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/streams/key".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /streams/key failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchCreateStreamMarkerActor,
inports::<100>(description, user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_create_stream_marker(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/streams/markers".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("description") {
body.insert("description".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("user_id") {
body.insert("user_id".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /streams/markers failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadStreamMarkersActor,
inports::<100>(user_id, video_id, first, before, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_stream_markers(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/streams/markers".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("video_id") {
query_pairs.push(("video_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("before") {
query_pairs.push(("before", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /streams/markers failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadBroadcasterSubscriptionsActor,
inports::<100>(broadcaster_id, user_id, first, after, before),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_broadcaster_subscriptions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/subscriptions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("before") {
query_pairs.push(("before", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /subscriptions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchListSubscriptionsActor,
inports::<100>(broadcaster_id, user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_list_subscriptions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/subscriptions/user".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /subscriptions/user failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadTeamsActor,
inports::<100>(name, id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_teams(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/teams".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("name") {
query_pairs.push(("name", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /teams failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadChannelTeamsActor,
inports::<100>(broadcaster_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_channel_teams(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/teams/channel".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /teams/channel failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateUserActor,
inports::<100>(description),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_user(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/users".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("description") {
query_pairs.push(("description", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PUT /users failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchDeleteUsersActor,
inports::<100>(target_user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_delete_users(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/users/blocks".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("target_user_id") {
query_pairs.push(("target_user_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /users/blocks failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadUserBlockListActor,
inports::<100>(broadcaster_id, first, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_user_block_list(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/users/blocks".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /users/blocks failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateUsersActor,
inports::<100>(target_user_id, source_context, reason),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_users(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/users/blocks".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("target_user_id") {
query_pairs.push(("target_user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("source_context") {
query_pairs.push(("source_context", super::message_to_str(val)));
}
if let Some(val) = inputs.get("reason") {
query_pairs.push(("reason", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PUT /users/blocks failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadUserActiveExtensionsActor,
inports::<100>(user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_user_active_extensions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/users/extensions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /users/extensions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchUpdateUserExtensionsActor,
inports::<100>(data),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_update_user_extensions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/users/extensions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("data") {
body.insert("data".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PUT /users/extensions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadUserExtensionsActor,
inports::<100>(trigger),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_user_extensions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/users/extensions/list".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /users/extensions/list failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchDeleteVideosActor,
inports::<100>(id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_delete_videos(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/videos".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /videos failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchReadVideosActor,
inports::<100>(id, user_id, game_id, language, period, sort, type_, first, after, before),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_read_videos(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/videos".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_id") {
query_pairs.push(("user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("game_id") {
query_pairs.push(("game_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if let Some(val) = inputs.get("period") {
query_pairs.push(("period", super::message_to_str(val)));
}
if let Some(val) = inputs.get("sort") {
query_pairs.push(("sort", super::message_to_str(val)));
}
if let Some(val) = inputs.get("type_") {
query_pairs.push(("type", super::message_to_str(val)));
}
if let Some(val) = inputs.get("first") {
query_pairs.push(("first", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("before") {
query_pairs.push(("before", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /videos failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchCreateClipFromVodActor,
inports::<100>(editor_id, broadcaster_id, vod_id, vod_offset, duration, title),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_create_clip_from_vod(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/videos/clips".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("editor_id") {
query_pairs.push(("editor_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("broadcaster_id") {
query_pairs.push(("broadcaster_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("vod_id") {
query_pairs.push(("vod_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("vod_offset") {
query_pairs.push(("vod_offset", super::message_to_str(val)));
}
if let Some(val) = inputs.get("duration") {
query_pairs.push(("duration", super::message_to_str(val)));
}
if let Some(val) = inputs.get("title") {
query_pairs.push(("title", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /videos/clips failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitchSendWhisperActor,
inports::<100>(from_user_id, to_user_id, message),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitch_send_whisper(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/whispers".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("from_user_id") {
query_pairs.push(("from_user_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("to_user_id") {
query_pairs.push(("to_user_id", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("message") {
body.insert("message".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /whispers failed: {}", e).into()),
);
}
}
Ok(output)
}