tencent_scf/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3//! A rust runtime for Tencent Cloud Serverless Compute Function
4//!
5//! This library provides a basic custom runtime for a rust application
6//! on Tencent Cloud as Serverless Compute Function. For complete setup,
7//! see [Setup].
8//!
9//! # `start` vs `start_uncatched`
10//! There are two varaints of `start` function to start the runtime, to sum up:
11//! * [`start`] catches panic from the provided serverless compute function, but this requires the
12//! function to be [`RefUnwindSafe`]. Most pure function should satisfy this.
13//! * [`start_uncatched`] allows the serverless compute function to panic the whole process and relies on
14//! the cloud to tear down the executor and restart. The downside of this is when a query causes
15//! panic on the function, the cloud typically waits until timeout before acknowledging the
16//! failure. Sometimes this is necessary to reference `!RefUnwindSafe` types. Connection pool,
17//! for example, usually is not unwind-safe.
18//!
19//! # Built-in Events
20//! When enabling the correspondent `builtin-*` feature, auto serialization and deserialization of
21//! certain built-in events and reponses are supported.
22//!
23//! Current supported built-in events/responses are:
24//! * API Gateway: enabled by feature `builtin-api-gateway`, supports trigger as described in [API Gateway
25//! Trigger]. `builtin::api::Request` and `builtin::api::Response` will be provided, which are
26//! re-export of `http::Request` and `http::Response` with support of auto deserialization/serialization.
27//!
28//!   For more information, see [the module-level documentation][builtin::api].
29//!   ```no_run
30//!   # #[cfg(feature = "builtin-api-gateway")]
31//!   # {
32//!   use std::convert::Infallible;
33//!  
34//!   use tencent_scf::{
35//!       builtin::api::{Request, Response, ResponseBuilder},
36//!       make_scf, start, Context,
37//!   };
38//!  
39//!   fn main() {
40//!       let scf = make_scf(
41//!           |event: Request<String>, _context: Context| -> Result<Response<String>, Infallible> {
42//!               Ok(ResponseBuilder::new()
43//!                   .status(200)
44//!                   .body("Hello World".to_string())
45//!                   .unwrap())
46//!           },
47//!       );
48//!       start(scf);
49//!   }
50//!   # }
51//!   ```
52//!
53//! # Custom Event and Response
54//! Custom types of event and reponse are supported.
55//!
56//! ## Auto Serialization/Deserialization
57//! Any type that implements [`serde::Deserialize`] and is marked as [`convert::AsJson`] can be
58//! deserialized from JSON automatically when feature "json" is enabled. Similarly for types
59//! implementing [`serde::Serialize`] and is marked as [`convert::AsJson`].
60//!
61//! ### Example
62//! ```no_run
63//! use std::convert::Infallible;
64//!
65//! use serde::{Deserialize, Serialize};
66//! // with "json" feature enabled
67//! use tencent_scf::{convert::AsJson, make_scf, start, Context};
68//!
69//! // define a custom event
70//! #[derive(Deserialize)]
71//! struct CustomEvent {
72//!     a: i32,
73//!     b: i32,
74//! }
75//!
76//! // mark the custom event as json for auto deserialization
77//! impl AsJson for CustomEvent {}
78//!
79//! // define a custom response
80//! #[derive(Serialize)]
81//! struct CustomResponse {
82//!     a_plus_b: i32,
83//! }
84//!
85//! // mark the custom response as json for auto serialization
86//! impl AsJson for CustomResponse {}
87//!
88//! // make the scf
89//! let scf = make_scf(
90//!     |event: CustomEvent, _context: Context| -> Result<CustomResponse, Infallible> {
91//!         Ok(CustomResponse {
92//!             a_plus_b: event.a + event.b,
93//!         })
94//!     },
95//! );
96//! // start the runtime in the main function
97//! start(scf);
98//! ```
99//! ## Manual Serialization/Deserialization
100//! User can also chose to implement [`convert::FromReader`] for incoming events and
101//! [`convert::IntoBytes`] for outgoing response.
102//!
103//! [Setup]: https://github.com/johnmave126/tencent_scf/blob/master/README.md#deployment
104//! [API Gateway Trigger]: https://cloud.tencent.com/document/product/583/12513
105pub mod builtin;
106pub mod convert;
107
108mod helper;
109
110use std::{
111    env,
112    fmt::Display,
113    marker::PhantomData,
114    panic::{self, RefUnwindSafe},
115    str::FromStr,
116};
117
118use ureq::{Agent, AgentBuilder, Response};
119
120/// The context of the invocation, assembled from environment variables and invocation headers.
121///
122/// The concrete description of each field can be found at [Custom Runtime API]
123/// and [Built-in Environment Variables].
124///
125/// [Custom Runtime API]: https://cloud.tencent.com/document/product/583/47274#custom-runtime-.E8.BF.90.E8.A1.8C.E6.97.B6-api
126/// [Built-in Environment Variables]: https://cloud.tencent.com/document/product/583/30228#.E5.B7.B2.E5.86.85.E7.BD.AE.E7.8E.AF.E5.A2.83.E5.8F.98.E9.87.8F
127
128#[derive(Debug, Clone, PartialEq, Eq)]
129pub struct Context {
130    /// from header `memory_limit_in_mb`
131    pub memory_limit_in_mb: usize,
132    /// from header `time_limit_in_ms`
133    pub time_limit_in_ms: usize,
134    /// from header `request_id`
135    pub request_id: String,
136    /// from enviroment variable `SCF_NAMESPACE`
137    pub namespace: String,
138    /// from enviroment variables `SCF_FUNCTIONNAME`
139    pub function_name: String,
140    /// from enviroment variable `SCF_FUNCTIONVERSION`
141    pub function_version: String,
142    /// from enviroment variable `TENCENTCLOUD_REGION`
143    pub region: String,
144    /// from enviroment variable `TENCENTCLOUD_APPID`
145    pub appid: String,
146    /// from enviroment variable `TENCENTCLOUD_UIN`
147    pub uin: String,
148}
149
150impl Context {
151    /// Creates a context from values from header and from enviroment (via `env_context`).
152    fn new(
153        memory_limit_in_mb: usize,
154        time_limit_in_ms: usize,
155        request_id: String,
156        env_context: &EnvContext,
157    ) -> Self {
158        Self {
159            memory_limit_in_mb,
160            time_limit_in_ms,
161            request_id,
162            namespace: env_context.namespace.clone(),
163            function_name: env_context.function_name.clone(),
164            function_version: env_context.function_version.clone(),
165            region: env_context.region.clone(),
166            appid: env_context.appid.clone(),
167            uin: env_context.uin.clone(),
168        }
169    }
170}
171
172/// A subset of [`Context`] where the values are pulled from the environment variables
173#[derive(Debug, Clone)]
174struct EnvContext {
175    namespace: String,
176    function_name: String,
177    function_version: String,
178    region: String,
179    appid: String,
180    uin: String,
181}
182
183impl EnvContext {
184    /// Load values from environment variables. The names of the variables are retrieved from [Built-in Environment Variables]
185    ///
186    /// # Panics
187    /// The function will panic if any value is missing.
188    ///
189    /// [Built-in Environment Variables]: https://cloud.tencent.com/document/product/583/30228#.E5.B7.B2.E5.86.85.E7.BD.AE.E7.8E.AF.E5.A2.83.E5.8F.98.E9.87.8F
190    fn load() -> Self {
191        Self {
192            namespace: env::var("SCF_NAMESPACE").unwrap(),
193            function_name: env::var("SCF_FUNCTIONNAME").unwrap(),
194            function_version: env::var("SCF_FUNCTIONVERSION").unwrap(),
195            region: env::var("TENCENTCLOUD_REGION").unwrap(),
196            appid: env::var("TENCENTCLOUD_APPID").unwrap(),
197            uin: env::var("TENCENTCLOUD_UIN").unwrap(),
198        }
199    }
200}
201
202/// Main trait for a serverless compute function.
203///
204/// # Note
205/// Depending on whether the concrete type is unwind-safe, user should choose between [`start`]
206/// and [`start_uncatched`] to start the runtime. This trait **doesn't** imply `RefUnwindSafe`.
207///
208/// # Implement the Trait
209/// Using a closure should cover most of the cases. If a struct/enum is used, the trait can be
210/// implemented as:
211/// ```no_run
212/// use tencent_scf::{start, Context, ServerlessComputeFunction};
213/// struct MyFunction {
214///     some_attribute: String,
215/// }
216///
217/// impl ServerlessComputeFunction for MyFunction {
218///     // Type for input event
219///     type Event = String;
220///     // Type for output response
221///     type Response = String;
222///     // Type for possible error
223///     type Error = std::convert::Infallible;
224///
225///     // Implement the execution of the function
226///     fn call(
227///         &self,
228///         event: Self::Event,
229///         context: Context,
230///     ) -> Result<Self::Response, Self::Error> {
231///         // We just concatenate the strings
232///         Ok(event + &self.some_attribute)
233///     }
234/// }
235///
236/// start(MyFunction {
237///     some_attribute: "suffix".to_string(),
238/// });
239/// ```
240pub trait ServerlessComputeFunction {
241    /// The type for incoming event.
242    type Event;
243    /// The type for outgoing response.
244    type Response;
245    /// The type for error(s) during the execution of the function.
246    type Error;
247
248    /// The actual execution of the function. Implementer is allowed to panic as it will be catched
249    /// by the runtime.
250    fn call(&self, event: Self::Event, context: Context) -> Result<Self::Response, Self::Error>;
251}
252
253/// A wrapper struct to convert a closure into a [`ServerlessComputeFunction`].
254///
255/// The main reason we need this is to make sure we can use associative type in
256/// [`ServerlessComputeFunction`].
257#[doc(hidden)]
258pub struct Closure<Event, Response, Error, Function> {
259    f: Function,
260    phantom: PhantomData<panic::AssertUnwindSafe<(Event, Response, Error)>>,
261}
262
263#[doc(hidden)]
264impl<Event, Response, Error, Function> ServerlessComputeFunction
265    for Closure<Event, Response, Error, Function>
266where
267    Function: Fn(Event, Context) -> Result<Response, Error>,
268{
269    type Event = Event;
270    type Response = Response;
271    type Error = Error;
272    fn call(&self, event: Event, context: Context) -> Result<Response, Error> {
273        (&self.f)(event, context)
274    }
275}
276
277/// Create a [`ServerlessComputeFunction`] from a closure.
278///# Example
279/// ```
280/// # #[cfg(feature = "builtin-api-gateway")]
281/// # {
282/// use std::convert::Infallible;
283///
284/// use tencent_scf::{
285///     builtin::api::{Request, Response, ResponseBuilder},
286///     make_scf, Context,
287/// };
288///
289/// let scf = make_scf(
290///     |event: Request<String>, _context: Context| -> Result<Response<String>, Infallible> {
291///         Ok(ResponseBuilder::new()
292///             .status(200)
293///             .body("Hello World".to_string())
294///             .unwrap())
295///     },
296/// );
297/// # }
298/// ```
299pub fn make_scf<Event, Response, Error, Function>(
300    f: Function,
301) -> Closure<Event, Response, Error, Function>
302where
303    Function: Fn(Event, Context) -> Result<Response, Error> + 'static,
304{
305    Closure {
306        f,
307        phantom: PhantomData,
308    }
309}
310
311/// Start the runtime with the given serverless compute function.
312///
313/// Typically this should be the last call in the `main` function.
314///
315/// # Panic
316/// The runtime will panic if any of the following happens:
317/// * Fail to initialize, for example, expected environment vairable not found.
318/// * Fail to communicate to upstream cloud.
319/// * Receive malformed response from upstream cloud.
320///
321/// The runtime will **not** panic if the serverless compute function panics. Instead, the panic will
322/// be captured and properly sent to the upstream cloud. The runtime may not be able to capture
323/// **all** kinds of panic, see [`std::panic::catch_unwind`] for more information.
324///
325/// # Example
326/// ```no_run
327/// # #[cfg(feature = "json")]
328/// # {
329/// use std::convert::Infallible;
330///
331/// use serde::{Deserialize, Serialize};
332/// use tencent_scf::{convert::AsJson, make_scf, start, Context};
333///
334/// // define a custom event
335/// #[derive(Deserialize)]
336/// struct CustomEvent {
337///     a: i32,
338///     b: i32,
339/// }
340///
341/// // mark the custom event as json for auto deserialization
342/// impl AsJson for CustomEvent {}
343///
344/// // define a custom response
345/// #[derive(Serialize)]
346/// struct CustomResponse {
347///     a_plus_b: i32,
348/// }
349///
350/// // mark the custom response as json for auto serialization
351/// impl AsJson for CustomResponse {}
352///
353/// fn main() {
354///     // make the scf
355///     let scf = make_scf(
356///         |event: CustomEvent, _context: Context| -> Result<CustomResponse, Infallible> {
357///             Ok(CustomResponse {
358///                 a_plus_b: event.a + event.b,
359///             })
360///         },
361///     );
362///     // start the runtime in the main function
363///     start(scf);
364/// }
365/// # }
366/// ```
367pub fn start<Event, Response, Error, ConvertEventError, ConvertResponseError, Function>(f: Function)
368where
369    Function: ServerlessComputeFunction<Event = Event, Response = Response, Error = Error>
370        + RefUnwindSafe,
371    Event: convert::FromReader<Error = ConvertEventError>,
372    Response: convert::IntoBytes<Error = ConvertResponseError>,
373    Error: Display,
374    ConvertEventError: Display,
375    ConvertResponseError: Display,
376{
377    let rt = Runtime::new();
378
379    // Notify upstream cloud that we are ready
380    rt.notify_ready();
381
382    // Main loop
383    loop {
384        // Fetch next event
385        if let Some((event, context)) = rt.next() {
386            // The response is well-formed
387            let result = rt.invoke(&f, event, context);
388            // Send the result to the cloud
389            rt.send_result(result);
390        }
391    }
392}
393
394/// Start the runtime with the given serverless compute function without catching panics.
395///
396/// Typically this should be the last call in the `main` function.
397///
398/// # Panic
399/// The runtime will panic if any of the following happens:
400/// * Fail to initialize, for example, expected environment vairable not found.
401/// * Fail to communicate to upstream cloud.
402/// * Receive malformed response from upstream cloud.
403/// * The serverless compute function panics during execution.
404///
405/// # Example
406/// Here is an example where the function must be started without panic catching:
407/// ```no_run
408/// use tencent_scf::{make_scf, start_uncatched, Context};
409/// use ureq::AgentBuilder;
410///
411/// // build an http agent
412/// // this object is not unwind-safe so any closure that captures it is not unwind-safe
413/// let agent = AgentBuilder::new().build();
414/// // make the scf
415/// let scf = make_scf(
416///     move |event: serde_json::Value,
417///           _context: Context|
418///           -> Result<serde_json::Value, ureq::Error> {
419///         // do something using agent
420///         let _resp = agent.get("http://example.com/").call()?;
421///         Ok(event)
422///     },
423/// );
424/// // start the runtime in the main function
425/// start_uncatched(scf);
426/// // this doesn't compile
427/// // tencent_scf::start(scf);
428/// ```
429pub fn start_uncatched<Event, Response, Error, ConvertEventError, ConvertResponseError, Function>(
430    f: Function,
431) where
432    Function: ServerlessComputeFunction<Event = Event, Response = Response, Error = Error>,
433    Event: convert::FromReader<Error = ConvertEventError>,
434    Response: convert::IntoBytes<Error = ConvertResponseError>,
435    Error: Display,
436    ConvertEventError: Display,
437    ConvertResponseError: Display,
438{
439    let rt = Runtime::new();
440
441    // Notify upstream cloud that we are ready
442    rt.notify_ready();
443
444    // Main loop
445    loop {
446        // Fetch next event
447        if let Some((event, context)) = rt.next() {
448            // The response is well-formed
449            let result = rt.invoke_uncatched(&f, event, context);
450            // Send the result to the cloud
451            rt.send_result(result);
452        }
453    }
454}
455
456/// A struct that contains information for a runtime
457struct Runtime {
458    agent: Agent,
459    ready_url: String,
460    next_url: String,
461    response_url: String,
462    error_url: String,
463    env_context: EnvContext,
464}
465
466impl Runtime {
467    /// Create a runtime
468    fn new() -> Self {
469        // HTTP client pool
470        let agent = AgentBuilder::new().build();
471
472        // Extract cloud runtime server and port from environment variables
473        let api_server = env::var("SCF_RUNTIME_API").unwrap();
474        let api_port = env::var("SCF_RUNTIME_API_PORT").unwrap();
475
476        // Assemble urls for upstream cloud communication
477        // Gathered from [Custom Runtime
478        // API](https://cloud.tencent.com/document/product/583/47274#custom-runtime-.E8.BF.90.E8.A1.8C.E6.97.B6-api)
479        let ready_url = format!("http://{}:{}/runtime/init/ready", api_server, api_port);
480        let next_url = format!("http://{}:{}/runtime/invocation/next", api_server, api_port);
481        let response_url = format!(
482            "http://{}:{}/runtime/invocation/response",
483            api_server, api_port
484        );
485        let error_url = format!(
486            "http://{}:{}/runtime/invocation/error",
487            api_server, api_port
488        );
489
490        // Load context from environment variables
491        let env_context = EnvContext::load();
492
493        Self {
494            agent,
495            ready_url,
496            next_url,
497            response_url,
498            error_url,
499            env_context,
500        }
501    }
502
503    /// Notify the upstream cloud that the runtime is ready
504    #[inline]
505    fn notify_ready(&self) {
506        // A space must be sent, otherwise the upstream cloud rejects the request.
507        self.agent
508            .post(&self.ready_url)
509            .send_string(" ")
510            .expect("fail to notify cloud about readiness");
511    }
512
513    /// Get the next event from upstream cloud
514    #[inline]
515    fn next<Event, ConvertError>(&self) -> Option<(Event, Context)>
516    where
517        Event: convert::FromReader<Error = ConvertError>,
518        ConvertError: Display,
519    {
520        let resp = self
521            .agent
522            .get(&self.next_url)
523            .call()
524            .expect("fail to retrieve next event from cloud");
525
526        match self.break_parts(resp) {
527            Ok(parts) => Some(parts),
528            Err(err) => {
529                // Fail to parse the response
530                self.send_error_message(&err);
531                None
532            }
533        }
534    }
535
536    /// Invoke a scf with panic catching
537    fn invoke<Event, Response, Error, ConvertEventError, ConvertResponseError, Function>(
538        &self,
539        f: &Function,
540        event: Event,
541        context: Context,
542    ) -> Result<Response, String>
543    where
544        Function: ServerlessComputeFunction<Event = Event, Response = Response, Error = Error>
545            + RefUnwindSafe,
546        Event: convert::FromReader<Error = ConvertEventError>,
547        Response: convert::IntoBytes<Error = ConvertResponseError>,
548        Error: Display,
549        ConvertEventError: Display,
550        ConvertResponseError: Display,
551    {
552        let invoke_result = {
553            // Replace the panic handler to redirect panic messages.
554            // This is a RAII construct so the panic handler should be reinstated after the
555            // end of this block
556            let panic_guard = helper::PanicGuard::new();
557            let invoke_result = panic::catch_unwind({
558                let scf = &f;
559                // The event was deserialized from a byte stream and *should not* affect the
560                // outer environment had a panic happens, but we cannot prevent crazy
561                // implementater for `convert::FromReader` where they somehow introduce
562                // `!UnwindSafe` types. Implementers are warned in the documentation for
563                // `convert::FromReader` so for ergonomics we assert unwind-safety for
564                // event
565                let event = panic::AssertUnwindSafe(event);
566                move || scf.call(event.0, context)
567            })
568            .map_err(|_| panic_guard.get_panic());
569            invoke_result
570        };
571        match invoke_result {
572            // The execution succeeded
573            Ok(Ok(response)) => Ok(response),
574            Ok(Err(err)) => {
575                // There are errors during the execution
576                Err(format!("function failed with error: {}", err))
577            }
578            Err(message) => {
579                // panic, now extract panic message from buffer
580                Err(message)
581            }
582        }
583    }
584
585    /// Invoke a scf without panic catching
586    fn invoke_uncatched<Event, Response, Error, ConvertEventError, ConvertResponseError, Function>(
587        &self,
588        f: &Function,
589        event: Event,
590        context: Context,
591    ) -> Result<Response, String>
592    where
593        Function: ServerlessComputeFunction<Event = Event, Response = Response, Error = Error>,
594        Event: convert::FromReader<Error = ConvertEventError>,
595        Response: convert::IntoBytes<Error = ConvertResponseError>,
596        Error: Display,
597        ConvertEventError: Display,
598        ConvertResponseError: Display,
599    {
600        f.call(event, context)
601            .map_err(|err| format!("function failed with error: {}", err))
602    }
603
604    /// Send the execution result to the upstream cloud
605    fn send_result<Response, ConvertResponseError>(&self, result: Result<Response, String>)
606    where
607        Response: convert::IntoBytes<Error = ConvertResponseError>,
608        ConvertResponseError: Display,
609    {
610        match result {
611            Ok(response) => match response.into_bytes() {
612                Ok(response) => {
613                    // Send the result to the upstream
614                    self.agent
615                        .post(&self.response_url)
616                        .send_bytes(&response)
617                        .expect("fail to send response to the cloud");
618                }
619                Err(err) => {
620                    // Fail to encode the response
621                    self.send_error_message(&format!("fail to encode function response: {}", err));
622                }
623            },
624
625            Err(err) => {
626                self.send_error_message(&err);
627            }
628        }
629    }
630
631    /// Break a response for an event into payload and metadata, and try to parse the payload into
632    /// event and collect metadata to form a context.
633    #[doc(hidden)]
634    fn break_parts<Event, ConvertError>(
635        &self,
636        invocation: Response,
637    ) -> Result<(Event, Context), String>
638    where
639        Event: convert::FromReader<Error = ConvertError>,
640        ConvertError: Display,
641    {
642        // Name of the headers from: [Custom Runtime
643        // API](https://cloud.tencent.com/document/product/583/47274#custom-runtime-.E8.BF.90.E8.A1.8C.E6.97.B6-api)
644        let memory_limit_in_mb = Self::parse_header(&invocation, "memory_limit_in_mb")?;
645        let time_limit_in_ms = Self::parse_header(&invocation, "time_limit_in_ms")?;
646        let request_id = Self::parse_header(&invocation, "request_id")?;
647
648        let reader = invocation.into_reader();
649        let event = Event::from_reader(reader)
650            .map_err(|e| format!("failed to parse incoming invocation payload: {}", e))?;
651
652        Ok((
653            event,
654            Context::new(
655                memory_limit_in_mb,
656                time_limit_in_ms,
657                request_id,
658                &self.env_context,
659            ),
660        ))
661    }
662    /// A helper function to parse a header in the response. Returns an error when the header doesn't
663    /// exist or the parsing fails.
664    #[inline]
665    fn parse_header<T, Error>(response: &Response, header: &str) -> Result<T, String>
666    where
667        T: FromStr<Err = Error>,
668        Error: Display,
669    {
670        match response.header(header) {
671            Some(value) => value.parse().map_err(|e| {
672                format!(
673                    "fail to parse value of header {} of incoming invocation: {}",
674                    header, e
675                )
676            }),
677            None => Err(format!(
678                "header {} is not present in the incoming invocation",
679                header
680            )),
681        }
682    }
683
684    /// A helper function to send the error message to the upstream cloud
685    #[inline]
686    fn send_error_message(&self, message: &str) {
687        self.agent
688            .post(&self.error_url)
689            .send_string(message)
690            .expect("fail to send error message to the cloud");
691    }
692}