use bytes::Bytes;
use cel_cxx::Opaque;
use chrono::Datelike;
use hyper::{Request, Response};
use salvo::http::uri::Scheme;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
use wasmtime_wasi_http::p3::{Request as WasiRequest, Response as WasiResponse};
use crate::{
events::content::InboundContent, wasm::bindgen::witmproxy::plugin::capabilities::RequestContext,
};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Opaque)]
#[cel_cxx(display)]
pub struct CelConnect {
pub host: String,
pub port: u16,
}
impl CelConnect {
pub fn host(&self) -> &str {
&self.host
}
pub fn port(&self) -> u16 {
self.port
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Opaque)]
#[cel_cxx(display)]
pub struct CelRequest {
pub scheme: String,
pub host: String,
pub path: String,
pub query: HashMap<String, Vec<String>>,
pub method: String,
pub headers: HashMap<String, Vec<String>>,
}
impl CelRequest {
pub fn scheme(&self) -> &str {
&self.scheme
}
pub fn host(&self) -> &str {
&self.host
}
pub fn path(&self) -> &str {
&self.path
}
pub fn query(&self) -> &HashMap<String, Vec<String>> {
&self.query
}
pub fn method(&self) -> &str {
&self.method
}
pub fn headers(&self) -> &HashMap<String, Vec<String>> {
&self.headers
}
}
impl From<CelRequest> for RequestContext {
fn from(val: CelRequest) -> Self {
let query = val
.query
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let headers = val
.headers
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
RequestContext {
scheme: val.scheme,
host: val.host,
path: val.path,
query,
method: val.method,
headers,
}
}
}
impl From<&RequestContext> for CelRequest {
fn from(ctx: &RequestContext) -> Self {
let query = ctx
.query
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let headers = ctx
.headers
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
CelRequest {
scheme: ctx.scheme.clone(),
host: ctx.host.clone(),
path: ctx.path.clone(),
query,
method: ctx.method.clone(),
headers,
}
}
}
impl From<&WasiRequest> for CelRequest {
fn from(req: &WasiRequest) -> Self {
let mut headers = HashMap::new();
for (name, value) in req.headers.iter() {
let entry = headers
.entry(name.as_str().to_string())
.or_insert_with(Vec::new);
if let Ok(val_str) = value.to_str() {
entry.push(val_str.to_string());
}
}
let host = if let Some(authority) = &req.authority {
authority.to_string()
} else {
"".to_string()
};
let mut query = HashMap::new();
let mut path = "".to_string();
let scheme = req.scheme.clone().unwrap_or(Scheme::HTTPS).to_string();
let method = req.method.to_string();
if let Some(path_and_query) = &req.path_with_query {
path = path_and_query.path().to_string();
if let Some(query_str) = path_and_query.query() {
for (key, value) in url::form_urlencoded::parse(query_str.as_bytes()) {
let entry = query.entry(key.to_string()).or_insert_with(Vec::new);
entry.push(value.to_string());
}
}
}
CelRequest {
scheme,
host,
path,
query,
method,
headers,
}
}
}
impl<B> From<&Request<B>> for CelRequest
where
B: http_body::Body<Data = Bytes> + Send + 'static,
{
fn from(req: &Request<B>) -> Self {
let mut headers = HashMap::new();
for (name, value) in req.headers().iter() {
let entry = headers
.entry(name.as_str().to_string())
.or_insert_with(Vec::new);
if let Ok(val_str) = value.to_str() {
entry.push(val_str.to_string());
}
}
let host = if let Some(authority) = req.uri().authority() {
authority.to_string()
} else {
"".to_string()
};
let mut query = HashMap::new();
let mut path = "".to_string();
let scheme = req.uri().scheme_str().unwrap_or("https").to_string();
let method = req.method().clone().to_string();
if let Some(path_and_query) = req.uri().path_and_query() {
path = path_and_query.path().to_string();
if let Some(query_str) = path_and_query.query() {
for (key, value) in url::form_urlencoded::parse(query_str.as_bytes()) {
let entry = query.entry(key.to_string()).or_insert_with(Vec::new);
entry.push(value.to_string());
}
}
}
CelRequest {
scheme,
host,
path,
query,
method,
headers,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Opaque)]
#[cel_cxx(display)]
pub struct CelResponse {
pub status: u16,
pub headers: HashMap<String, Vec<String>>,
}
impl CelResponse {
pub fn status(&self) -> u16 {
self.status
}
pub fn headers(&self) -> &HashMap<String, Vec<String>> {
&self.headers
}
}
impl<B> From<&Response<B>> for CelResponse
where
B: http_body::Body<Data = Bytes> + Send + 'static,
{
fn from(res: &Response<B>) -> Self {
let mut headers = HashMap::new();
for (name, value) in res.headers().iter() {
let entry = headers
.entry(name.as_str().to_string())
.or_insert_with(Vec::new);
if let Ok(val_str) = value.to_str() {
entry.push(val_str.to_string());
}
}
CelResponse {
status: res.status().as_u16(),
headers,
}
}
}
impl From<&WasiResponse> for CelResponse {
fn from(res: &WasiResponse) -> Self {
let mut headers = HashMap::new();
for (name, value) in res.headers.iter() {
let entry = headers
.entry(name.as_str().to_string())
.or_insert_with(Vec::new);
if let Ok(val_str) = value.to_str() {
entry.push(val_str.to_string());
}
}
CelResponse {
status: res.status.as_u16(),
headers,
}
}
}
impl From<&reqwest::Request> for CelRequest {
fn from(req: &reqwest::Request) -> Self {
let mut headers = HashMap::new();
for (name, value) in req.headers().iter() {
let entry = headers
.entry(name.as_str().to_string())
.or_insert_with(Vec::new);
if let Ok(val_str) = value.to_str() {
entry.push(val_str.to_string());
}
}
let url = req.url();
let host = url.host_str().unwrap_or("").to_string();
let path = url.path().to_string();
let scheme = url.scheme().to_string();
let method = req.method().to_string();
let mut query = HashMap::new();
for (key, value) in url.query_pairs() {
let entry = query.entry(key.to_string()).or_insert_with(Vec::new);
entry.push(value.to_string());
}
CelRequest {
scheme,
host,
path,
query,
method,
headers,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Opaque)]
#[cel_cxx(display)]
pub struct CelContent {
content_type: String,
}
impl CelContent {
pub fn content_type(&self) -> &str {
&self.content_type
}
}
impl From<&InboundContent> for CelContent {
fn from(content: &InboundContent) -> Self {
CelContent {
content_type: content.content_type(),
}
}
}
impl<B> From<&Response<B>> for CelContent
where
B: http_body::Body<Data = Bytes> + Send + 'static,
{
fn from(res: &Response<B>) -> Self {
let content_type = if let Some(values) = res.headers().get("content-type") {
if let Ok(val_str) = values.to_str() {
val_str.to_string()
} else {
"unknown".to_string()
}
} else {
"unknown".to_string()
};
CelContent { content_type }
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Opaque)]
#[cel_cxx(display)]
pub struct CelTime {
hour: u32,
day_of_week: u32,
now_utc: chrono::DateTime<chrono::Utc>,
}
impl CelTime {
pub fn now() -> Self {
let now_utc = chrono::Utc::now();
let hour = now_utc.hour();
let day_of_week = match now_utc.weekday() {
chrono::Weekday::Sun => 0,
chrono::Weekday::Mon => 1,
chrono::Weekday::Tue => 2,
chrono::Weekday::Wed => 3,
chrono::Weekday::Thu => 4,
chrono::Weekday::Fri => 5,
chrono::Weekday::Sat => 6,
};
Self {
hour,
day_of_week,
now_utc,
}
}
pub fn matches_cron(&self, cron_str: &str) -> bool {
match cron::Schedule::from_str(cron_str) {
Ok(schedule) => schedule.upcoming(chrono::Utc).take(1).any(|next| {
let diff = next.signed_duration_since(self.now_utc);
diff.num_seconds().abs() < 60
}),
Err(_) => false,
}
}
pub fn is_day_of_week(&self, weekday: i64) -> bool {
(0..=6).contains(&weekday) && self.day_of_week == weekday as u32
}
pub fn is_between_hours(&self, hour_start: i64, hour_end: i64) -> bool {
if !(0..=23).contains(&hour_start) || !(0..=23).contains(&hour_end) {
return false;
}
let h = self.hour as i64;
if hour_start <= hour_end {
h >= hour_start && h <= hour_end
} else {
h >= hour_start || h <= hour_end
}
}
pub fn register_cel_env(
env: cel_cxx::EnvBuilder<'_>,
) -> anyhow::Result<cel_cxx::EnvBuilder<'_>> {
let env = env
.declare_variable::<CelTime>("time")?
.register_member_function("matches_cron", CelTime::matches_cron)?
.register_member_function("is_day_of_week", CelTime::is_day_of_week)?
.register_member_function("is_between_hours", CelTime::is_between_hours)?;
Ok(env)
}
}
use chrono::Timelike;