#![allow(clippy::all, unused_imports, dead_code)]
use crate::{Actor, ActorBehavior, ClientBuilderExt, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::time::Duration;
const BASE_URL: &str = "https://api.airtable.com/v0";
const ENV_KEY: &str = "AIRTABLE_API_KEY";
fn apply_auth(
config: &reflow_actor::ActorConfig,
mut builder: reqwest::RequestBuilder,
) -> Result<reqwest::RequestBuilder> {
let credential = config
.get_config_or_env(ENV_KEY)
.ok_or_else(|| anyhow::anyhow!("Missing env var: {}", ENV_KEY))?;
builder = builder.header("Authorization", format!("Bearer {}", credential));
Ok(builder)
}
#[actor(
AirtableListRecordsActor,
inports::<100>(baseId, tableIdOrName, maxRecords, view),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_list_records(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{baseId}/{tableIdOrName}".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("tableIdOrName") {
endpoint = endpoint.replace("{{tableIdOrName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("maxRecords") {
query_pairs.push(("maxRecords", super::message_to_str(val)));
}
if let Some(val) = inputs.get("view") {
query_pairs.push(("view", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /{{baseId}}/{{tableIdOrName}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AirtableCreateRecordActor,
inports::<100>(baseId, tableIdOrName, records),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_create_record(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{baseId}/{tableIdOrName}".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("tableIdOrName") {
endpoint = endpoint.replace("{{tableIdOrName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("records") {
body.insert("records".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /{{baseId}}/{{tableIdOrName}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AirtableUpdateRecordActor,
inports::<100>(baseId, tableIdOrName, records),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_update_record(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{baseId}/{tableIdOrName}".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("tableIdOrName") {
endpoint = endpoint.replace("{{tableIdOrName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("records") {
body.insert("records".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PATCH /{{baseId}}/{{tableIdOrName}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AirtableReadRecordActor,
inports::<100>(baseId, tableIdOrName, recordId, returnFieldsByFieldId, cellFormat),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_read_record(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{baseId}/{tableIdOrName}/{recordId}".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("tableIdOrName") {
endpoint = endpoint.replace("{{tableIdOrName}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("recordId") {
endpoint = endpoint.replace("{{recordId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("returnFieldsByFieldId") {
query_pairs.push(("returnFieldsByFieldId", super::message_to_str(val)));
}
if let Some(val) = inputs.get("cellFormat") {
query_pairs.push(("cellFormat", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /{{baseId}}/{{tableIdOrName}}/{{recordId}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
AirtableUpdateRecordsActor,
inports::<100>(baseId, tableIdOrName, records, typecast, performUpsert),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_update_records(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{baseId}/{tableIdOrName}".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("tableIdOrName") {
endpoint = endpoint.replace("{{tableIdOrName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("records") {
body.insert("records".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("typecast") {
body.insert("typecast".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("performUpsert") {
body.insert("performUpsert".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("PATCH /{{baseId}}/{{tableIdOrName}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AirtableDeleteRecordActor,
inports::<100>(baseId, tableIdOrName, recordId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_delete_record(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{baseId}/{tableIdOrName}/{recordId}".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("tableIdOrName") {
endpoint = endpoint.replace("{{tableIdOrName}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("recordId") {
endpoint = endpoint.replace("{{recordId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"DELETE /{{baseId}}/{{tableIdOrName}}/{{recordId}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
AirtableDeleteRecordsActor,
inports::<100>(baseId, tableIdOrName, records),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_delete_records(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/{baseId}/{tableIdOrName}".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("tableIdOrName") {
endpoint = endpoint.replace("{{tableIdOrName}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("records") {
query_pairs.push(("records[]", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("DELETE /{{baseId}}/{{tableIdOrName}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
AirtableListBasesActor,
inports::<100>(offset),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_list_bases(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/meta/bases".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("offset") {
query_pairs.push(("offset", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /meta/bases failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AirtableReadBaseSchemaActor,
inports::<100>(baseId, include),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_read_base_schema(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/meta/bases/{baseId}/tables".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("include") {
query_pairs.push(("include", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /meta/bases/{{baseId}}/tables failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AirtableCreateTableActor,
inports::<100>(baseId, name, description, fields),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_create_table(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/meta/bases/{baseId}/tables".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("description") {
body.insert("description".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("fields") {
body.insert("fields".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /meta/bases/{{baseId}}/tables failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AirtableUpdateTableActor,
inports::<100>(baseId, tableId, name, description),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_update_table(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/meta/bases/{baseId}/tables/{tableId}".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("tableId") {
endpoint = endpoint.replace("{{tableId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("description") {
body.insert("description".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"PATCH /meta/bases/{{baseId}}/tables/{{tableId}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
AirtableCreateFieldActor,
inports::<100>(baseId, tableId, name, type_, description, options),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_create_field(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/meta/bases/{baseId}/tables/{tableId}/fields".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("tableId") {
endpoint = endpoint.replace("{{tableId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("type_") {
body.insert("type".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("description") {
body.insert("description".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("options") {
body.insert("options".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"POST /meta/bases/{{baseId}}/tables/{{tableId}}/fields failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
AirtableUpdateFieldActor,
inports::<100>(baseId, tableId, fieldId, name, description, options),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_update_field(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/meta/bases/{baseId}/tables/{tableId}/fields/{fieldId}".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("tableId") {
endpoint = endpoint.replace("{{tableId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("fieldId") {
endpoint = endpoint.replace("{{fieldId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.patch(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("description") {
body.insert("description".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("options") {
body.insert("options".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("PATCH /meta/bases/{{baseId}}/tables/{{tableId}}/fields/{{fieldId}} failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
AirtableCreateWebhookActor,
inports::<100>(baseId, notificationUrl, specification),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_create_webhook(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/bases/{baseId}/webhooks".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("notificationUrl") {
body.insert("notificationUrl".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("specification") {
body.insert("specification".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /bases/{{baseId}}/webhooks failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AirtableListWebhooksActor,
inports::<100>(baseId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_list_webhooks(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/bases/{baseId}/webhooks".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /bases/{{baseId}}/webhooks failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AirtableDeleteWebhookActor,
inports::<100>(baseId, webhookId),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_delete_webhook(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/bases/{baseId}/webhooks/{webhookId}".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("webhookId") {
endpoint = endpoint.replace("{{webhookId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"DELETE /bases/{{baseId}}/webhooks/{{webhookId}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
AirtableListWebhookPayloadsActor,
inports::<100>(baseId, webhookId, cursor),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn airtable_list_webhook_payloads(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/bases/{baseId}/webhooks/{webhookId}/payloads".to_string();
if let Some(val) = inputs.get("baseId") {
endpoint = endpoint.replace("{{baseId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("webhookId") {
endpoint = endpoint.replace("{{webhookId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("cursor") {
query_pairs.push(("cursor", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /bases/{{baseId}}/webhooks/{{webhookId}}/payloads failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}