#![cfg_attr(docsrs, feature(doc_cfg))]
pub mod builtin;
pub mod convert;
mod helper;
use std::{
env,
fmt::Display,
marker::PhantomData,
panic::{self, RefUnwindSafe},
str::FromStr,
};
use ureq::{Agent, AgentBuilder, Response};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Context {
pub memory_limit_in_mb: usize,
pub time_limit_in_ms: usize,
pub request_id: String,
pub namespace: String,
pub function_name: String,
pub function_version: String,
pub region: String,
pub appid: String,
pub uin: String,
}
impl Context {
fn new(
memory_limit_in_mb: usize,
time_limit_in_ms: usize,
request_id: String,
env_context: &EnvContext,
) -> Self {
Self {
memory_limit_in_mb,
time_limit_in_ms,
request_id,
namespace: env_context.namespace.clone(),
function_name: env_context.function_name.clone(),
function_version: env_context.function_version.clone(),
region: env_context.region.clone(),
appid: env_context.appid.clone(),
uin: env_context.uin.clone(),
}
}
}
#[derive(Debug, Clone)]
struct EnvContext {
namespace: String,
function_name: String,
function_version: String,
region: String,
appid: String,
uin: String,
}
impl EnvContext {
fn load() -> Self {
Self {
namespace: env::var("SCF_NAMESPACE").unwrap(),
function_name: env::var("SCF_FUNCTIONNAME").unwrap(),
function_version: env::var("SCF_FUNCTIONVERSION").unwrap(),
region: env::var("TENCENTCLOUD_REGION").unwrap(),
appid: env::var("TENCENTCLOUD_APPID").unwrap(),
uin: env::var("TENCENTCLOUD_UIN").unwrap(),
}
}
}
pub trait ServerlessComputeFunction {
type Event;
type Response;
type Error;
fn call(&self, event: Self::Event, context: Context) -> Result<Self::Response, Self::Error>;
}
#[doc(hidden)]
pub struct Closure<Event, Response, Error, Function> {
f: Function,
phantom: PhantomData<panic::AssertUnwindSafe<(Event, Response, Error)>>,
}
#[doc(hidden)]
impl<Event, Response, Error, Function> ServerlessComputeFunction
for Closure<Event, Response, Error, Function>
where
Function: Fn(Event, Context) -> Result<Response, Error>,
{
type Event = Event;
type Response = Response;
type Error = Error;
fn call(&self, event: Event, context: Context) -> Result<Response, Error> {
(&self.f)(event, context)
}
}
pub fn make_scf<Event, Response, Error, Function>(
f: Function,
) -> Closure<Event, Response, Error, Function>
where
Function: Fn(Event, Context) -> Result<Response, Error> + 'static,
{
Closure {
f,
phantom: PhantomData,
}
}
pub fn start<Event, Response, Error, ConvertEventError, ConvertResponseError, Function>(f: Function)
where
Function: ServerlessComputeFunction<Event = Event, Response = Response, Error = Error>
+ RefUnwindSafe,
Event: convert::FromReader<Error = ConvertEventError>,
Response: convert::IntoBytes<Error = ConvertResponseError>,
Error: Display,
ConvertEventError: Display,
ConvertResponseError: Display,
{
let rt = Runtime::new();
rt.notify_ready();
loop {
if let Some((event, context)) = rt.next() {
let result = rt.invoke(&f, event, context);
rt.send_result(result);
}
}
}
pub fn start_uncatched<Event, Response, Error, ConvertEventError, ConvertResponseError, Function>(
f: Function,
) where
Function: ServerlessComputeFunction<Event = Event, Response = Response, Error = Error>,
Event: convert::FromReader<Error = ConvertEventError>,
Response: convert::IntoBytes<Error = ConvertResponseError>,
Error: Display,
ConvertEventError: Display,
ConvertResponseError: Display,
{
let rt = Runtime::new();
rt.notify_ready();
loop {
if let Some((event, context)) = rt.next() {
let result = rt.invoke_uncatched(&f, event, context);
rt.send_result(result);
}
}
}
struct Runtime {
agent: Agent,
ready_url: String,
next_url: String,
response_url: String,
error_url: String,
env_context: EnvContext,
}
impl Runtime {
fn new() -> Self {
let agent = AgentBuilder::new().build();
let api_server = env::var("SCF_RUNTIME_API").unwrap();
let api_port = env::var("SCF_RUNTIME_API_PORT").unwrap();
let ready_url = format!("http://{}:{}/runtime/init/ready", api_server, api_port);
let next_url = format!("http://{}:{}/runtime/invocation/next", api_server, api_port);
let response_url = format!(
"http://{}:{}/runtime/invocation/response",
api_server, api_port
);
let error_url = format!(
"http://{}:{}/runtime/invocation/error",
api_server, api_port
);
let env_context = EnvContext::load();
Self {
agent,
ready_url,
next_url,
response_url,
error_url,
env_context,
}
}
#[inline]
fn notify_ready(&self) {
self.agent
.post(&self.ready_url)
.send_string(" ")
.expect("fail to notify cloud about readiness");
}
#[inline]
fn next<Event, ConvertError>(&self) -> Option<(Event, Context)>
where
Event: convert::FromReader<Error = ConvertError>,
ConvertError: Display,
{
let resp = self
.agent
.get(&self.next_url)
.call()
.expect("fail to retrieve next event from cloud");
match self.break_parts(resp) {
Ok(parts) => Some(parts),
Err(err) => {
self.send_error_message(&err);
None
}
}
}
fn invoke<Event, Response, Error, ConvertEventError, ConvertResponseError, Function>(
&self,
f: &Function,
event: Event,
context: Context,
) -> Result<Response, String>
where
Function: ServerlessComputeFunction<Event = Event, Response = Response, Error = Error>
+ RefUnwindSafe,
Event: convert::FromReader<Error = ConvertEventError>,
Response: convert::IntoBytes<Error = ConvertResponseError>,
Error: Display,
ConvertEventError: Display,
ConvertResponseError: Display,
{
let invoke_result = {
let panic_guard = helper::PanicGuard::new();
let invoke_result = panic::catch_unwind({
let scf = &f;
let event = panic::AssertUnwindSafe(event);
move || scf.call(event.0, context)
})
.map_err(|_| panic_guard.get_panic());
invoke_result
};
match invoke_result {
Ok(Ok(response)) => Ok(response),
Ok(Err(err)) => {
Err(format!("function failed with error: {}", err))
}
Err(message) => {
Err(message)
}
}
}
fn invoke_uncatched<Event, Response, Error, ConvertEventError, ConvertResponseError, Function>(
&self,
f: &Function,
event: Event,
context: Context,
) -> Result<Response, String>
where
Function: ServerlessComputeFunction<Event = Event, Response = Response, Error = Error>,
Event: convert::FromReader<Error = ConvertEventError>,
Response: convert::IntoBytes<Error = ConvertResponseError>,
Error: Display,
ConvertEventError: Display,
ConvertResponseError: Display,
{
f.call(event, context)
.map_err(|err| format!("function failed with error: {}", err))
}
fn send_result<Response, ConvertResponseError>(&self, result: Result<Response, String>)
where
Response: convert::IntoBytes<Error = ConvertResponseError>,
ConvertResponseError: Display,
{
match result {
Ok(response) => match response.into_bytes() {
Ok(response) => {
self.agent
.post(&self.response_url)
.send_bytes(&response)
.expect("fail to send response to the cloud");
}
Err(err) => {
self.send_error_message(&format!("fail to encode function response: {}", err));
}
},
Err(err) => {
self.send_error_message(&err);
}
}
}
#[doc(hidden)]
fn break_parts<Event, ConvertError>(
&self,
invocation: Response,
) -> Result<(Event, Context), String>
where
Event: convert::FromReader<Error = ConvertError>,
ConvertError: Display,
{
let memory_limit_in_mb = Self::parse_header(&invocation, "memory_limit_in_mb")?;
let time_limit_in_ms = Self::parse_header(&invocation, "time_limit_in_ms")?;
let request_id = Self::parse_header(&invocation, "request_id")?;
let reader = invocation.into_reader();
let event = Event::from_reader(reader)
.map_err(|e| format!("failed to parse incoming invocation payload: {}", e))?;
Ok((
event,
Context::new(
memory_limit_in_mb,
time_limit_in_ms,
request_id,
&self.env_context,
),
))
}
#[inline]
fn parse_header<T, Error>(response: &Response, header: &str) -> Result<T, String>
where
T: FromStr<Err = Error>,
Error: Display,
{
match response.header(header) {
Some(value) => value.parse().map_err(|e| {
format!(
"fail to parse value of header {} of incoming invocation: {}",
header, e
)
}),
None => Err(format!(
"header {} is not present in the incoming invocation",
header
)),
}
}
#[inline]
fn send_error_message(&self, message: &str) {
self.agent
.post(&self.error_url)
.send_string(message)
.expect("fail to send error message to the cloud");
}
}