#![allow(clippy::all, unused_imports, dead_code)]
use crate::{Actor, ActorBehavior, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::time::Duration;
const BASE_URL: &str = "https://api.ipgeolocation.io";
const ENV_KEY: &str = "IPGEOLOCATION_API_KEY";
fn apply_auth(
config: &reflow_actor::ActorConfig,
mut builder: reqwest::RequestBuilder,
) -> Result<reqwest::RequestBuilder> {
let credential = config
.get_config_or_env(ENV_KEY)
.ok_or_else(|| anyhow::anyhow!("Missing env var: {}", ENV_KEY))?;
builder = builder.query(&[("apiKey", &credential)]);
Ok(builder)
}
#[actor(
IpgeolocationReadGeolocationActor,
inports::<100>(ip, fields, include),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn ipgeolocation_read_geolocation(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/ipgeo".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("ip") {
query_pairs.push(("ip", super::message_to_str(val)));
}
if let Some(val) = inputs.get("fields") {
query_pairs.push(("fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("include") {
query_pairs.push(("include", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /ipgeo failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
IpgeolocationBatchGeolocationActor,
inports::<100>(ips),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn ipgeolocation_batch_geolocation(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/ipgeo-bulk".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("ips") {
body.insert("ips".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /ipgeo-bulk failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
IpgeolocationReadTimezoneActor,
inports::<100>(ip, lat, long),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn ipgeolocation_read_timezone(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/timezone".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("ip") {
query_pairs.push(("ip", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("long") {
query_pairs.push(("long", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /timezone failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
IpgeolocationReadIpGeolocationActor,
inports::<100>(apiKey, ip, fields, excludes, include, lang, output),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn ipgeolocation_read_ip_geolocation(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/ipgeo".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apiKey") {
query_pairs.push(("apiKey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("ip") {
query_pairs.push(("ip", super::message_to_str(val)));
}
if let Some(val) = inputs.get("fields") {
query_pairs.push(("fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("excludes") {
query_pairs.push(("excludes", super::message_to_str(val)));
}
if let Some(val) = inputs.get("include") {
query_pairs.push(("include", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lang") {
query_pairs.push(("lang", super::message_to_str(val)));
}
if let Some(val) = inputs.get("output") {
query_pairs.push(("output", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /ipgeo failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
IpgeolocationReadBulkIpGeolocationActor,
inports::<100>(apiKey, ips, fields, excludes, include, lang),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn ipgeolocation_read_bulk_ip_geolocation(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/ipgeo-bulk".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apiKey") {
query_pairs.push(("apiKey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("fields") {
query_pairs.push(("fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("excludes") {
query_pairs.push(("excludes", super::message_to_str(val)));
}
if let Some(val) = inputs.get("include") {
query_pairs.push(("include", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lang") {
query_pairs.push(("lang", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("ips") {
body.insert("ips".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /ipgeo-bulk failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
IpgeolocationReadClientIpActor,
inports::<100>(output),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn ipgeolocation_read_client_ip(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/getip".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("output") {
query_pairs.push(("output", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /getip failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
IpgeolocationReadIpSecurityActor,
inports::<100>(apiKey, ip, include, output),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn ipgeolocation_read_ip_security(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/ipgeo".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apiKey") {
query_pairs.push(("apiKey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("ip") {
query_pairs.push(("ip", super::message_to_str(val)));
}
if let Some(val) = inputs.get("include") {
query_pairs.push(("include", super::message_to_str(val)));
}
if let Some(val) = inputs.get("output") {
query_pairs.push(("output", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /ipgeo failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
IpgeolocationReadAsnActor,
inports::<100>(apiKey, ip, asn, fields, output),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn ipgeolocation_read_asn(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/asn".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apiKey") {
query_pairs.push(("apiKey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("ip") {
query_pairs.push(("ip", super::message_to_str(val)));
}
if let Some(val) = inputs.get("asn") {
query_pairs.push(("asn", super::message_to_str(val)));
}
if let Some(val) = inputs.get("fields") {
query_pairs.push(("fields", super::message_to_str(val)));
}
if let Some(val) = inputs.get("output") {
query_pairs.push(("output", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /asn failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
IpgeolocationReadAbuseContactActor,
inports::<100>(apiKey, ip, output),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn ipgeolocation_read_abuse_contact(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/abuse-contact".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apiKey") {
query_pairs.push(("apiKey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("ip") {
query_pairs.push(("ip", super::message_to_str(val)));
}
if let Some(val) = inputs.get("output") {
query_pairs.push(("output", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /abuse-contact failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
IpgeolocationReadUserAgentActor,
inports::<100>(apiKey, uaString, output),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn ipgeolocation_read_user_agent(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/user-agent".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apiKey") {
query_pairs.push(("apiKey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("uaString") {
query_pairs.push(("uaString", super::message_to_str(val)));
}
if let Some(val) = inputs.get("output") {
query_pairs.push(("output", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /user-agent failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
IpgeolocationReadAstronomyActor,
inports::<100>(apiKey, ip, lat, long, location, date, output),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn ipgeolocation_read_astronomy(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/astronomy".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apiKey") {
query_pairs.push(("apiKey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("ip") {
query_pairs.push(("ip", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("long") {
query_pairs.push(("long", super::message_to_str(val)));
}
if let Some(val) = inputs.get("location") {
query_pairs.push(("location", super::message_to_str(val)));
}
if let Some(val) = inputs.get("date") {
query_pairs.push(("date", super::message_to_str(val)));
}
if let Some(val) = inputs.get("output") {
query_pairs.push(("output", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /astronomy failed: {}", e).into()),
);
}
}
Ok(output)
}