#![allow(clippy::all, unused_imports, dead_code)]
use crate::{Actor, ActorBehavior, ClientBuilderExt, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::time::Duration;
const BASE_URL: &str = "https://api.twitter.com/2";
const ENV_KEY: &str = "TWITTER_API_KEY";
fn apply_auth(
config: &reflow_actor::ActorConfig,
mut builder: reqwest::RequestBuilder,
) -> Result<reqwest::RequestBuilder> {
let credential = config
.get_config_or_env(ENV_KEY)
.ok_or_else(|| anyhow::anyhow!("Missing env var: {}", ENV_KEY))?;
builder = builder.header("Authorization", format!("Bearer {}", credential));
Ok(builder)
}
#[actor(
TwitterCreateTweetActor,
inports::<100>(text),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_create_tweet(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/tweets".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("text") {
body.insert("text".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /tweets failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterSearchTweetsActor,
inports::<100>(query),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_search_tweets(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/tweets/search/recent".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("query") {
query_pairs.push(("query", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /tweets/search/recent failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterDeleteTweetActor,
inports::<100>(id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_delete_tweet(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/tweets/{id}".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /tweets/{{id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterReadTweetActor,
inports::<100>(id, tweet_fields, expansions, user_fields, media_fields, place_fields, poll_fields),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_read_tweet(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/tweets/{id}".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("tweet_fields") {
query_pairs.push(("tweet.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("expansions") {
query_pairs.push(("expansions", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_fields") {
query_pairs.push(("user.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("media_fields") {
query_pairs.push(("media.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("place_fields") {
query_pairs.push(("place.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("poll_fields") {
query_pairs.push(("poll.fields", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /tweets/{{id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterReadTweetsActor,
inports::<100>(ids, tweet_fields, expansions, user_fields),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_read_tweets(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/tweets".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("ids") {
query_pairs.push(("ids", super::message_to_str(val)));
}
if let Some(val) = inputs.get("tweet_fields") {
query_pairs.push(("tweet.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("expansions") {
query_pairs.push(("expansions", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_fields") {
query_pairs.push(("user.fields", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /tweets failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterReadTweetCountsActor,
inports::<100>(query, start_time, end_time, granularity),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_read_tweet_counts(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/tweets/counts/recent".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("query") {
query_pairs.push(("query", super::message_to_str(val)));
}
if let Some(val) = inputs.get("start_time") {
query_pairs.push(("start_time", super::message_to_str(val)));
}
if let Some(val) = inputs.get("end_time") {
query_pairs.push(("end_time", super::message_to_str(val)));
}
if let Some(val) = inputs.get("granularity") {
query_pairs.push(("granularity", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /tweets/counts/recent failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterReadUserActor,
inports::<100>(id, user_fields, expansions, tweet_fields),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_read_user(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{id}".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_fields") {
query_pairs.push(("user.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("expansions") {
query_pairs.push(("expansions", super::message_to_str(val)));
}
if let Some(val) = inputs.get("tweet_fields") {
query_pairs.push(("tweet.fields", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /users/{{id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterReadUsersActor,
inports::<100>(ids, user_fields, expansions),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_read_users(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/users".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("ids") {
query_pairs.push(("ids", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_fields") {
query_pairs.push(("user.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("expansions") {
query_pairs.push(("expansions", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /users failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterReadMeActor,
inports::<100>(user_fields, expansions, tweet_fields),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_read_me(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/users/me".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("user_fields") {
query_pairs.push(("user.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("expansions") {
query_pairs.push(("expansions", super::message_to_str(val)));
}
if let Some(val) = inputs.get("tweet_fields") {
query_pairs.push(("tweet.fields", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /users/me failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterListUserTweetsActor,
inports::<100>(id, max_results, pagination_token, start_time, end_time, since_id, until_id, tweet_fields, expansions, exclude),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_list_user_tweets(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{id}/tweets".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max_results", super::message_to_str(val)));
}
if let Some(val) = inputs.get("pagination_token") {
query_pairs.push(("pagination_token", super::message_to_str(val)));
}
if let Some(val) = inputs.get("start_time") {
query_pairs.push(("start_time", super::message_to_str(val)));
}
if let Some(val) = inputs.get("end_time") {
query_pairs.push(("end_time", super::message_to_str(val)));
}
if let Some(val) = inputs.get("since_id") {
query_pairs.push(("since_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("until_id") {
query_pairs.push(("until_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("tweet_fields") {
query_pairs.push(("tweet.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("expansions") {
query_pairs.push(("expansions", super::message_to_str(val)));
}
if let Some(val) = inputs.get("exclude") {
query_pairs.push(("exclude", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /users/{{id}}/tweets failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterListUserMentionsActor,
inports::<100>(id, max_results, pagination_token, start_time, end_time, tweet_fields, expansions),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_list_user_mentions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{id}/mentions".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max_results", super::message_to_str(val)));
}
if let Some(val) = inputs.get("pagination_token") {
query_pairs.push(("pagination_token", super::message_to_str(val)));
}
if let Some(val) = inputs.get("start_time") {
query_pairs.push(("start_time", super::message_to_str(val)));
}
if let Some(val) = inputs.get("end_time") {
query_pairs.push(("end_time", super::message_to_str(val)));
}
if let Some(val) = inputs.get("tweet_fields") {
query_pairs.push(("tweet.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("expansions") {
query_pairs.push(("expansions", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /users/{{id}}/mentions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterListFollowersActor,
inports::<100>(id, max_results, pagination_token, user_fields, expansions),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_list_followers(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{id}/followers".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max_results", super::message_to_str(val)));
}
if let Some(val) = inputs.get("pagination_token") {
query_pairs.push(("pagination_token", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_fields") {
query_pairs.push(("user.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("expansions") {
query_pairs.push(("expansions", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /users/{{id}}/followers failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterListFollowingActor,
inports::<100>(id, max_results, pagination_token, user_fields),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_list_following(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{id}/following".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max_results", super::message_to_str(val)));
}
if let Some(val) = inputs.get("pagination_token") {
query_pairs.push(("pagination_token", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_fields") {
query_pairs.push(("user.fields", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /users/{{id}}/following failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterCreateFollowActor,
inports::<100>(id, target_user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_create_follow(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{id}/following".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("target_user_id") {
body.insert("target_user_id".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /users/{{id}}/following failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterDeleteFollowActor,
inports::<100>(source_user_id, target_user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_delete_follow(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{source_user_id}/following/{target_user_id}".to_string();
if let Some(val) = inputs.get("source_user_id") {
endpoint = endpoint.replace("{{source_user_id}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("target_user_id") {
endpoint = endpoint.replace("{{target_user_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"DELETE /users/{{source_user_id}}/following/{{target_user_id}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
TwitterCreateLikeActor,
inports::<100>(id, tweet_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_create_like(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{id}/likes".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("tweet_id") {
body.insert("tweet_id".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /users/{{id}}/likes failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterDeleteLikeActor,
inports::<100>(id, tweet_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_delete_like(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{id}/likes/{tweet_id}".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("tweet_id") {
endpoint = endpoint.replace("{{tweet_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("DELETE /users/{{id}}/likes/{{tweet_id}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
TwitterListLikedTweetsActor,
inports::<100>(id, max_results, pagination_token, tweet_fields),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_list_liked_tweets(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{id}/liked_tweets".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max_results", super::message_to_str(val)));
}
if let Some(val) = inputs.get("pagination_token") {
query_pairs.push(("pagination_token", super::message_to_str(val)));
}
if let Some(val) = inputs.get("tweet_fields") {
query_pairs.push(("tweet.fields", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /users/{{id}}/liked_tweets failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterCreateRetweetActor,
inports::<100>(id, tweet_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_create_retweet(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{id}/retweets".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("tweet_id") {
body.insert("tweet_id".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /users/{{id}}/retweets failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterDeleteRetweetActor,
inports::<100>(id, source_tweet_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_delete_retweet(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{id}/retweets/{source_tweet_id}".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("source_tweet_id") {
endpoint = endpoint.replace("{{source_tweet_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"DELETE /users/{{id}}/retweets/{{source_tweet_id}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
TwitterListListsActor,
inports::<100>(id, max_results, pagination_token, list_fields),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_list_lists(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{id}/owned_lists".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max_results", super::message_to_str(val)));
}
if let Some(val) = inputs.get("pagination_token") {
query_pairs.push(("pagination_token", super::message_to_str(val)));
}
if let Some(val) = inputs.get("list_fields") {
query_pairs.push(("list.fields", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /users/{{id}}/owned_lists failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterCreateListActor,
inports::<100>(name, description, private),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_create_list(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/lists".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("description") {
body.insert("description".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("private") {
body.insert("private".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /lists failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterReadListActor,
inports::<100>(id, list_fields, expansions),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_read_list(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/lists/{id}".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("list_fields") {
query_pairs.push(("list.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("expansions") {
query_pairs.push(("expansions", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /lists/{{id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterUpdateListActor,
inports::<100>(id, name, description, private),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_update_list(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/lists/{id}".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.put(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("description") {
body.insert("description".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("private") {
body.insert("private".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PUT /lists/{{id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterDeleteListActor,
inports::<100>(id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_delete_list(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/lists/{id}".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /lists/{{id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterListListTweetsActor,
inports::<100>(id, max_results, pagination_token, tweet_fields),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_list_list_tweets(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/lists/{id}/tweets".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max_results", super::message_to_str(val)));
}
if let Some(val) = inputs.get("pagination_token") {
query_pairs.push(("pagination_token", super::message_to_str(val)));
}
if let Some(val) = inputs.get("tweet_fields") {
query_pairs.push(("tweet.fields", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /lists/{{id}}/tweets failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterListDmConversationsActor,
inports::<100>(dm_conversation_id, max_results, pagination_token, dm_event_fields, expansions),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_list_dm_conversations(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/dm_conversations/{dm_conversation_id}/dm_events".to_string();
if let Some(val) = inputs.get("dm_conversation_id") {
endpoint = endpoint.replace("{{dm_conversation_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max_results", super::message_to_str(val)));
}
if let Some(val) = inputs.get("pagination_token") {
query_pairs.push(("pagination_token", super::message_to_str(val)));
}
if let Some(val) = inputs.get("dm_event_fields") {
query_pairs.push(("dm_event.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("expansions") {
query_pairs.push(("expansions", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /dm_conversations/{{dm_conversation_id}}/dm_events failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
TwitterSendDmActor,
inports::<100>(conversation_type, participant_ids, message, dm_conversation_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_send_dm(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/dm_conversations".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("conversation_type") {
body.insert("conversation_type".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("participant_ids") {
body.insert("participant_ids".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("message") {
body.insert("message".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("dm_conversation_id") {
body.insert("dm_conversation_id".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /dm_conversations failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterListSpacesActor,
inports::<100>(query, state, max_results, space_fields, expansions),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_list_spaces(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/spaces/search".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("query") {
query_pairs.push(("query", super::message_to_str(val)));
}
if let Some(val) = inputs.get("state") {
query_pairs.push(("state", super::message_to_str(val)));
}
if let Some(val) = inputs.get("max_results") {
query_pairs.push(("max_results", super::message_to_str(val)));
}
if let Some(val) = inputs.get("space_fields") {
query_pairs.push(("space.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("expansions") {
query_pairs.push(("expansions", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /spaces/search failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterReadSpaceActor,
inports::<100>(id, space_fields, expansions),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_read_space(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/spaces/{id}".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("space_fields") {
query_pairs.push(("space.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("expansions") {
query_pairs.push(("expansions", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /spaces/{{id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterListTrendsActor,
inports::<100>(woeid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_list_trends(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/trends/by/woeid/{woeid}".to_string();
if let Some(val) = inputs.get("woeid") {
endpoint = endpoint.replace("{{woeid}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /trends/by/woeid/{{woeid}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterCreateBlockActor,
inports::<100>(id, target_user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_create_block(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{id}/blocking".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("target_user_id") {
body.insert("target_user_id".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /users/{{id}}/blocking failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterDeleteBlockActor,
inports::<100>(source_user_id, target_user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_delete_block(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{source_user_id}/blocking/{target_user_id}".to_string();
if let Some(val) = inputs.get("source_user_id") {
endpoint = endpoint.replace("{{source_user_id}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("target_user_id") {
endpoint = endpoint.replace("{{target_user_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"DELETE /users/{{source_user_id}}/blocking/{{target_user_id}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
TwitterCreateMuteActor,
inports::<100>(id, target_user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_create_mute(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{id}/muting".to_string();
if let Some(val) = inputs.get("id") {
endpoint = endpoint.replace("{{id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("target_user_id") {
body.insert("target_user_id".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /users/{{id}}/muting failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterDeleteMuteActor,
inports::<100>(source_user_id, target_user_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_delete_mute(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/users/{source_user_id}/muting/{target_user_id}".to_string();
if let Some(val) = inputs.get("source_user_id") {
endpoint = endpoint.replace("{{source_user_id}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("target_user_id") {
endpoint = endpoint.replace("{{target_user_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"DELETE /users/{{source_user_id}}/muting/{{target_user_id}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
TwitterCreateStreamRuleActor,
inports::<100>(add, delete),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_create_stream_rule(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/tweets/search/stream/rules".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("add") {
body.insert("add".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("delete") {
body.insert("delete".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /tweets/search/stream/rules failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterListStreamRulesActor,
inports::<100>(ids),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_list_stream_rules(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/tweets/search/stream/rules".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("ids") {
query_pairs.push(("ids", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /tweets/search/stream/rules failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
TwitterReadStreamActor,
inports::<100>(tweet_fields, expansions, user_fields, backfill_minutes),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn twitter_read_stream(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/tweets/search/stream".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("tweet_fields") {
query_pairs.push(("tweet.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("expansions") {
query_pairs.push(("expansions", super::message_to_str(val)));
}
if let Some(val) = inputs.get("user_fields") {
query_pairs.push(("user.fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("backfill_minutes") {
query_pairs.push(("backfill_minutes", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /tweets/search/stream failed: {}", e).into()),
);
}
}
Ok(output)
}