use crate::{
collection::RecipeId,
render::{FunctionError, SelectOption, TemplateContext},
};
use base64::{Engine, prelude::BASE64_STANDARD};
use bytes::Bytes;
use derive_more::FromStr;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use regex::Regex;
use serde::{Deserialize, de::IntoDeserializer};
use slumber_macros::template;
use slumber_template::{
Expected, RenderError, StreamSource, TryFromValue, Value, ValueError,
ValueStream, WithValue, impl_try_from_value_str,
};
use slumber_util::{TimeSpan, paths::expand_home};
use std::{
env, fmt::Debug, io, path::PathBuf, process::Stdio, str::FromStr, sync::Arc,
};
use tokio::{
fs::File,
io::{AsyncRead, AsyncWriteExt},
process::Command,
};
use tokio_util::io::ReaderStream;
use tracing::{Instrument, debug, debug_span};
#[template]
pub fn base64(
value: Bytes,
#[kwarg] decode: bool,
) -> Result<Bytes, FunctionError> {
if decode {
BASE64_STANDARD
.decode(&value)
.map(Bytes::from)
.map_err(FunctionError::from)
} else {
Ok(BASE64_STANDARD.encode(&value).into())
}
}
#[template]
pub fn boolean(value: Value) -> bool {
value.to_bool()
}
#[template]
pub fn command(
#[context] context: &TemplateContext,
command: Vec<String>,
#[kwarg] cwd: Option<String>,
#[kwarg] stdin: Option<Bytes>,
) -> Result<ValueStream, FunctionError> {
fn io_error(
program: &str,
arguments: &[String],
error: io::Error,
) -> RenderError {
RenderError::from(FunctionError::CommandInit {
program: program.to_owned(),
arguments: arguments.to_owned(),
error,
})
}
let cwd = context.root_dir.join(cwd.unwrap_or_default());
let [program, arguments @ ..] = command.as_slice() else {
return Err(FunctionError::CommandEmpty);
};
let program = program.clone();
let arguments = arguments.to_owned();
let span = debug_span!("command()", ?program, ?arguments);
let span_ = span.clone(); let future = async move {
debug!("Spawning");
let mut child = Command::new(&program)
.args(&arguments)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::null())
.current_dir(cwd)
.kill_on_drop(true)
.spawn()
.map_err(|error| io_error(&program, &arguments, error))?;
if let Some(stdin) = stdin {
child
.stdin
.as_mut()
.expect("Process missing stdin")
.write_all(&stdin)
.await
.map_err(|error| io_error(&program, &arguments, error))?;
}
let stdout = child.stdout.take().expect("stdout not set for child");
let handle = tokio::spawn(async move { child.wait().await });
let status_future = async move {
let status_result = handle.await;
debug!(?status_result, "Finished");
let status = status_result
.map_err(RenderError::other)? .map_err(|error| io_error(&program, &arguments, error))?;
if status.success() {
Ok(Bytes::new())
} else {
Err(FunctionError::CommandStatus {
program,
arguments,
status,
}
.into())
}
}
.instrument(span_);
Ok(reader_stream(stdout).chain(status_future.into_stream()))
}
.instrument(span);
let stream = future.try_flatten_stream().boxed();
Ok(ValueStream::Stream {
source: StreamSource::Command { command },
stream,
})
}
#[template]
pub fn concat(elements: Vec<String>) -> String {
elements.into_iter().join("")
}
#[template]
pub fn debug(value: Value) -> Value {
println!("{value:?}");
value
}
#[template]
pub fn env(variable: String, #[kwarg] default: String) -> String {
env::var(variable).unwrap_or(default)
}
#[template]
pub fn file(#[context] context: &TemplateContext, path: String) -> ValueStream {
let path = context.root_dir.join(expand_home(PathBuf::from(path)));
let source = StreamSource::File { path: path.clone() };
let future = async move {
let file = File::open(&path)
.await
.map_err(|error| FunctionError::File { path, error })?;
Ok(reader_stream(file))
};
ValueStream::Stream {
source,
stream: future.try_flatten_stream().boxed(),
}
}
#[template]
pub fn float(value: Value) -> Result<f64, ValueError> {
match value {
Value::Null => Ok(0.0),
Value::Boolean(b) => Ok((b).into()),
Value::Float(f) => Ok(f),
Value::Integer(i) => Ok(i as f64),
Value::String(s) => Ok(s.parse()?),
Value::Bytes(bytes) => Ok(std::str::from_utf8(&bytes)?.parse()?),
Value::Array(_) | Value::Object(_) => Err(ValueError::Type {
expected: Expected::OneOf(&[
&Expected::Float,
&Expected::Integer,
&Expected::Boolean,
&Expected::Custom("string/bytes that parse to a float"),
]),
}),
}
}
#[template]
pub fn index(index: i64, sequence: Sequence) -> Option<Value> {
let index = sequence.wrap_index(index);
if index >= sequence.len() as usize {
return None;
}
let value = match sequence {
Sequence::String(string) => string
.chars()
.nth(index)
.unwrap()
.to_string()
.into(),
Sequence::Bytes(bytes) => bytes.slice(index..=index).into(),
Sequence::Array(mut array) => array.swap_remove(index),
};
Some(value)
}
#[template]
pub fn integer(value: Value) -> Result<i64, ValueError> {
match value {
Value::Null => Ok(0),
Value::Boolean(b) => Ok(b.into()),
Value::Float(f) => Ok(f as i64),
Value::Integer(i) => Ok(i),
Value::String(s) => Ok(s.parse()?),
Value::Bytes(bytes) => Ok(std::str::from_utf8(&bytes)?.parse()?),
Value::Array(_) | Value::Object(_) => Err(ValueError::Type {
expected: Expected::OneOf(&[
&Expected::Integer,
&Expected::Float,
&Expected::Boolean,
&Expected::Custom("string/bytes that parse to an integer"),
]),
}),
}
}
#[template]
pub fn join(separator: String, values: Vec<String>) -> String {
values.join(&separator)
}
#[template]
pub fn jq(
query: JaqQuery,
value: JsonValue, #[kwarg] mode: JsonQueryMode,
) -> Result<Value, FunctionError> {
use jaq_core::{Ctx, RcIter};
let inputs = RcIter::new(core::iter::empty());
let results = query
.filter
.run((Ctx::new([], &inputs), jaq_json::Val::from(value.0)));
let items = results
.collect::<Result<Vec<_>, _>>()
.map_err(|error| FunctionError::Jq(error.to_string()))?;
mode.get_values(query.query, items.into_iter().map(serde_json::Value::from))
}
struct JaqQuery {
query: String,
filter: jaq_core::Filter<jaq_core::Native<jaq_json::Val>>,
}
impl FromStr for JaqQuery {
type Err = ValueError;
fn from_str(query: &str) -> Result<Self, ValueError> {
use jaq_core::{
Compiler,
load::{Arena, File, Loader},
};
fn format_errors<E>(errors: Vec<E>, f: impl Fn(E) -> String) -> String {
errors
.into_iter()
.map(f)
.format("; ")
.to_string()
}
let program = File {
code: query,
path: (),
};
let loader = Loader::new(jaq_std::defs());
let arena = Arena::default();
let modules = loader.load(&arena, program).map_err(|errors| {
ValueError::other(format_errors(errors, |(_, error)| match error {
jaq_core::load::Error::Io(items) => {
format_errors(items, |(path, error)| {
format!("error loading `{path}`: {error}")
})
}
jaq_core::load::Error::Lex(items) => {
format_errors(items, |(expected, actual)| {
format!(
"expected {expected}, got `{actual}`",
expected = expected.as_str()
)
})
}
jaq_core::load::Error::Parse(items) => {
format_errors(items, |(expected, actual)| {
format!(
"expected {expected}, got `{actual}`",
expected = expected.as_str()
)
})
}
}))
})?;
let filter = Compiler::default()
.with_funs(jaq_std::funs())
.compile(modules)
.map_err(|errors| {
ValueError::other(format_errors(errors, |(_, errors)| {
format_errors(errors, |(function, _)| {
format!("Undefined function `{function}`")
})
}))
})?;
Ok(Self {
query: query.to_owned(),
filter,
})
}
}
impl_try_from_value_str!(JaqQuery);
#[template]
pub fn json_parse(value: String) -> Result<serde_json::Value, FunctionError> {
serde_json::from_str(&value).map_err(FunctionError::JsonParse)
}
#[template]
pub fn jsonpath(
query: JsonPath,
value: JsonValue, #[kwarg] mode: JsonQueryMode,
) -> Result<Value, FunctionError> {
let query = query.0;
let node_list = query.query(&value.0);
mode.get_values(query.to_string(), node_list.into_iter().cloned())
}
#[derive(Debug, FromStr)]
pub struct JsonPath(serde_json_path::JsonPath);
impl_try_from_value_str!(JsonPath);
pub struct JsonValue(serde_json::Value);
impl TryFromValue for JsonValue {
fn try_from_value(value: Value) -> Result<Self, WithValue<ValueError>> {
let json_value = match value {
Value::String(s) => serde_json::from_str(&s)
.map_err(|error| WithValue::new(s.into(), error))?,
Value::Bytes(b) => serde_json::from_slice(&b)
.map_err(|error| WithValue::new(b.into(), error))?,
Value::Null => serde_json::Value::Null,
Value::Boolean(b) => b.into(),
Value::Integer(i) => i.into(),
Value::Float(f) => f.into(),
Value::Array(array) => array
.into_iter()
.map(serde_json::Value::try_from_value)
.collect::<Result<_, _>>()?,
Value::Object(map) => map
.into_iter()
.map(|(k, v)| Ok((k, serde_json::Value::try_from_value(v)?)))
.collect::<Result<_, _>>()?,
};
Ok(Self(json_value))
}
}
#[derive(Copy, Clone, Debug, Default)]
#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
pub enum JsonQueryMode {
#[default]
Auto,
Single,
Array,
}
impl JsonQueryMode {
fn get_values<Iter>(
self,
query: String,
values: Iter,
) -> Result<Value, FunctionError>
where
Iter: IntoIterator<Item = serde_json::Value>,
{
enum Case {
None,
One,
Many,
}
let mut iter = itertools::peek_nth(values);
let case =
match (iter.peek_nth(0).is_some(), iter.peek_nth(1).is_some()) {
(false, false) => Case::None,
(true, false) => Case::One,
(true, true) => Case::Many,
(false, true) => unreachable!(),
};
match (self, case) {
(Self::Auto | Self::Single, Case::None) => {
Err(FunctionError::JsonQueryNoResults { query })
}
(Self::Auto, Case::One) => {
Ok(Value::from_json(iter.next().unwrap()))
}
(Self::Auto, Case::Many) => {
Ok(Value::Array(iter.map(Value::from_json).collect()))
}
(Self::Single, Case::One) => {
Ok(Value::from_json(iter.next().unwrap()))
}
(Self::Single, Case::Many) => {
Err(FunctionError::JsonQueryTooMany {
query,
actual_count: iter.count(),
})
}
(Self::Array, _) => {
Ok(Value::Array(iter.map(Value::from_json).collect()))
}
}
}
}
impl FromStr for JsonQueryMode {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"auto" => Ok(Self::Auto),
"single" => Ok(Self::Single),
"array" => Ok(Self::Array),
_ => Err(format!(
"Invalid mode `{s}`; must be `array`, `single`, or `auto`"
)),
}
}
}
impl_try_from_value_str!(JsonQueryMode);
#[template]
pub fn lower(value: String) -> String {
value.to_lowercase()
}
#[template]
pub async fn prompt(
#[context] context: &TemplateContext,
#[kwarg] message: Option<String>,
#[kwarg] default: Option<String>,
#[kwarg] sensitive: bool,
) -> Result<String, FunctionError> {
let reply = context
.prompter
.prompt_text(message.unwrap_or_default(), default, sensitive)
.await
.ok_or(FunctionError::PromptNoReply)?;
if sensitive {
Ok(mask_sensitive(context, reply))
} else {
Ok(reply)
}
}
#[template]
pub fn replace(
from: String,
to: String,
value: String,
#[kwarg] regex: bool,
#[kwarg] n: Option<u32>,
) -> Result<String, FunctionError> {
if regex {
let regex = Regex::new(&from)?;
if let Some(n) = n {
Ok(regex.replacen(&value, n as usize, to).into_owned())
} else {
Ok(regex.replace_all(&value, to).into_owned())
}
} else {
if let Some(n) = n {
Ok(value.replacen(&from, &to, n as usize))
} else {
Ok(value.replace(&from, &to))
}
}
}
#[template]
pub async fn response(
#[context] context: &TemplateContext,
recipe_id: RecipeId,
#[kwarg] trigger: RequestTrigger,
) -> Result<Bytes, FunctionError> {
let response = context.get_latest_response(&recipe_id, trigger).await?;
let body = match Arc::try_unwrap(response) {
Ok(response) => response.body,
Err(response) => response.body.clone(),
};
Ok(body.into_bytes())
}
#[template]
pub async fn response_header(
#[context] context: &TemplateContext,
recipe_id: RecipeId,
header: String,
#[kwarg] trigger: RequestTrigger,
) -> Result<Bytes, FunctionError> {
let response = context.get_latest_response(&recipe_id, trigger).await?;
let header_value = match Arc::try_unwrap(response) {
Ok(mut response) => response.headers.remove(&header),
Err(response) => response.headers.get(&header).cloned(),
}
.ok_or_else(|| FunctionError::ResponseMissingHeader { header })?;
Ok(header_value.as_bytes().to_vec().into())
}
#[template]
pub async fn select(
#[context] context: &TemplateContext,
options: Vec<SelectOption>,
#[kwarg] message: Option<String>,
) -> Result<Value, FunctionError> {
if options.is_empty() {
return Err(FunctionError::SelectNoOptions);
}
context
.prompter
.prompt_select(message.unwrap_or_default(), options)
.await
.ok_or(FunctionError::PromptNoReply)
}
impl TryFromValue for SelectOption {
fn try_from_value(value: Value) -> Result<Self, WithValue<ValueError>> {
match value {
Value::Object(ref map) => {
Self::deserialize(map.into_deserializer())
.map_err(|error| WithValue::new(value, error))
}
value => Ok(Self {
label: value.clone().try_into_string()?,
value,
}),
}
}
}
#[template]
pub fn sensitive(
#[context] context: &TemplateContext,
value: String,
) -> String {
mask_sensitive(context, value)
}
#[template]
pub fn slice(start: i64, stop: Option<i64>, sequence: Sequence) -> Sequence {
let len = sequence.len();
let stop = stop.unwrap_or(len);
let start = sequence.wrap_index(start.min(len));
let stop = sequence.wrap_index(stop.min(len));
if stop < start {
return match sequence {
Sequence::String(_) => Sequence::String(String::new()),
Sequence::Bytes(_) => Sequence::Bytes(Bytes::new()),
Sequence::Array(_) => Sequence::Array(vec![]),
};
}
match sequence {
Sequence::String(string) => {
let string = string
.chars()
.skip(start)
.take(stop - start)
.collect::<String>();
Sequence::String(string)
}
Sequence::Bytes(bytes) => Sequence::Bytes(bytes.slice(start..stop)),
Sequence::Array(mut array) => {
let array = array.drain(start..stop).collect::<Vec<_>>();
Sequence::Array(array)
}
}
}
#[template]
pub fn split(
separator: String,
value: String,
#[kwarg] n: Option<u32>,
) -> Vec<String> {
if let Some(n) = n {
if n == 0 {
vec![value]
} else {
value
.splitn((n + 1) as usize, &separator)
.map(String::from)
.collect()
}
} else {
value.split(&separator).map(String::from).collect()
}
}
#[template]
pub fn string(value: Value) -> Result<String, ValueError> {
String::try_from_value(value).map_err(WithValue::into_error)
}
#[template]
pub fn trim(value: String, #[kwarg] mode: TrimMode) -> String {
match mode {
TrimMode::Start => value.trim_start().to_string(),
TrimMode::End => value.trim_end().to_string(),
TrimMode::Both => value.trim().to_string(),
}
}
#[template]
pub fn upper(value: String) -> String {
value.to_uppercase()
}
fn mask_sensitive(context: &TemplateContext, value: String) -> String {
if context.show_sensitive {
value
} else {
"•".repeat(value.chars().count())
}
}
#[derive(Copy, Clone, Debug, Default)]
pub enum RequestTrigger {
#[default]
Never,
NoHistory,
Expire { duration: TimeSpan },
Always,
}
impl FromStr for RequestTrigger {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"never" => Ok(Self::Never),
"no_history" => Ok(Self::NoHistory),
"always" => Ok(Self::Always),
_ => {
let duration = s.parse::<TimeSpan>().map_err(|_| {
"Expected \"never\", \"no_history\", \"always\", or a \
duration string such as \"1h\" \
(units are \"s\", \"m\", \"h\", or \"d\")"
})?;
Ok(Self::Expire { duration })
}
}
}
}
impl_try_from_value_str!(RequestTrigger);
#[derive(Copy, Clone, Debug, Default)]
pub enum TrimMode {
Start,
End,
#[default]
Both,
}
impl FromStr for TrimMode {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"start" => Ok(Self::Start),
"end" => Ok(Self::End),
"both" => Ok(Self::Both),
_ => Err(format!(
"Invalid mode `{s}`; must be `start`, `end`, or `both`"
)),
}
}
}
impl_try_from_value_str!(TrimMode);
impl TryFromValue for RecipeId {
fn try_from_value(value: Value) -> Result<Self, WithValue<ValueError>> {
String::try_from_value(value).map(RecipeId::from)
}
}
#[derive(Debug)]
enum Sequence {
String(String),
Bytes(Bytes),
Array(Vec<Value>),
}
impl Sequence {
fn len(&self) -> i64 {
(match self {
Self::String(string) => string.chars().count(),
Self::Bytes(bytes) => bytes.len(),
Self::Array(array) => array.len(),
}) as i64
}
fn wrap_index(&self, index: i64) -> usize {
let len = self.len();
if index < 0 && len > 0 {
index.rem_euclid(len) as usize
} else {
index as usize
}
}
}
impl TryFromValue for Sequence {
fn try_from_value(value: Value) -> Result<Self, WithValue<ValueError>> {
match value {
Value::String(string) => Ok(Self::String(string)),
Value::Bytes(bytes) => Ok(Self::Bytes(bytes)),
Value::Array(array) => Ok(Self::Array(array)),
_ => Err(WithValue::new(
value,
ValueError::Type {
expected: Expected::OneOf(&[
&Expected::String,
&Expected::Bytes,
&Expected::Array,
]),
},
)),
}
}
}
impl From<Sequence> for Value {
fn from(value: Sequence) -> Self {
match value {
Sequence::String(string) => Value::String(string),
Sequence::Bytes(bytes) => Value::Bytes(bytes),
Sequence::Array(array) => Value::Array(array),
}
}
}
fn reader_stream(
reader: impl AsyncRead,
) -> impl Stream<Item = Result<Bytes, RenderError>> {
ReaderStream::new(reader).map_err(RenderError::other)
}