tencent_scf/lib.rs
1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3//! A rust runtime for Tencent Cloud Serverless Compute Function
4//!
5//! This library provides a basic custom runtime for a rust application
6//! on Tencent Cloud as Serverless Compute Function. For complete setup,
7//! see [Setup].
8//!
9//! # `start` vs `start_uncatched`
10//! There are two varaints of `start` function to start the runtime, to sum up:
11//! * [`start`] catches panic from the provided serverless compute function, but this requires the
12//! function to be [`RefUnwindSafe`]. Most pure function should satisfy this.
13//! * [`start_uncatched`] allows the serverless compute function to panic the whole process and relies on
14//! the cloud to tear down the executor and restart. The downside of this is when a query causes
15//! panic on the function, the cloud typically waits until timeout before acknowledging the
16//! failure. Sometimes this is necessary to reference `!RefUnwindSafe` types. Connection pool,
17//! for example, usually is not unwind-safe.
18//!
19//! # Built-in Events
20//! When enabling the correspondent `builtin-*` feature, auto serialization and deserialization of
21//! certain built-in events and reponses are supported.
22//!
23//! Current supported built-in events/responses are:
24//! * API Gateway: enabled by feature `builtin-api-gateway`, supports trigger as described in [API Gateway
25//! Trigger]. `builtin::api::Request` and `builtin::api::Response` will be provided, which are
26//! re-export of `http::Request` and `http::Response` with support of auto deserialization/serialization.
27//!
28//! For more information, see [the module-level documentation][builtin::api].
29//! ```no_run
30//! # #[cfg(feature = "builtin-api-gateway")]
31//! # {
32//! use std::convert::Infallible;
33//!
34//! use tencent_scf::{
35//! builtin::api::{Request, Response, ResponseBuilder},
36//! make_scf, start, Context,
37//! };
38//!
39//! fn main() {
40//! let scf = make_scf(
41//! |event: Request<String>, _context: Context| -> Result<Response<String>, Infallible> {
42//! Ok(ResponseBuilder::new()
43//! .status(200)
44//! .body("Hello World".to_string())
45//! .unwrap())
46//! },
47//! );
48//! start(scf);
49//! }
50//! # }
51//! ```
52//!
53//! # Custom Event and Response
54//! Custom types of event and reponse are supported.
55//!
56//! ## Auto Serialization/Deserialization
57//! Any type that implements [`serde::Deserialize`] and is marked as [`convert::AsJson`] can be
58//! deserialized from JSON automatically when feature "json" is enabled. Similarly for types
59//! implementing [`serde::Serialize`] and is marked as [`convert::AsJson`].
60//!
61//! ### Example
62//! ```no_run
63//! use std::convert::Infallible;
64//!
65//! use serde::{Deserialize, Serialize};
66//! // with "json" feature enabled
67//! use tencent_scf::{convert::AsJson, make_scf, start, Context};
68//!
69//! // define a custom event
70//! #[derive(Deserialize)]
71//! struct CustomEvent {
72//! a: i32,
73//! b: i32,
74//! }
75//!
76//! // mark the custom event as json for auto deserialization
77//! impl AsJson for CustomEvent {}
78//!
79//! // define a custom response
80//! #[derive(Serialize)]
81//! struct CustomResponse {
82//! a_plus_b: i32,
83//! }
84//!
85//! // mark the custom response as json for auto serialization
86//! impl AsJson for CustomResponse {}
87//!
88//! // make the scf
89//! let scf = make_scf(
90//! |event: CustomEvent, _context: Context| -> Result<CustomResponse, Infallible> {
91//! Ok(CustomResponse {
92//! a_plus_b: event.a + event.b,
93//! })
94//! },
95//! );
96//! // start the runtime in the main function
97//! start(scf);
98//! ```
99//! ## Manual Serialization/Deserialization
100//! User can also chose to implement [`convert::FromReader`] for incoming events and
101//! [`convert::IntoBytes`] for outgoing response.
102//!
103//! [Setup]: https://github.com/johnmave126/tencent_scf/blob/master/README.md#deployment
104//! [API Gateway Trigger]: https://cloud.tencent.com/document/product/583/12513
105pub mod builtin;
106pub mod convert;
107
108mod helper;
109
110use std::{
111 env,
112 fmt::Display,
113 marker::PhantomData,
114 panic::{self, RefUnwindSafe},
115 str::FromStr,
116};
117
118use ureq::{Agent, AgentBuilder, Response};
119
120/// The context of the invocation, assembled from environment variables and invocation headers.
121///
122/// The concrete description of each field can be found at [Custom Runtime API]
123/// and [Built-in Environment Variables].
124///
125/// [Custom Runtime API]: https://cloud.tencent.com/document/product/583/47274#custom-runtime-.E8.BF.90.E8.A1.8C.E6.97.B6-api
126/// [Built-in Environment Variables]: https://cloud.tencent.com/document/product/583/30228#.E5.B7.B2.E5.86.85.E7.BD.AE.E7.8E.AF.E5.A2.83.E5.8F.98.E9.87.8F
127
128#[derive(Debug, Clone, PartialEq, Eq)]
129pub struct Context {
130 /// from header `memory_limit_in_mb`
131 pub memory_limit_in_mb: usize,
132 /// from header `time_limit_in_ms`
133 pub time_limit_in_ms: usize,
134 /// from header `request_id`
135 pub request_id: String,
136 /// from enviroment variable `SCF_NAMESPACE`
137 pub namespace: String,
138 /// from enviroment variables `SCF_FUNCTIONNAME`
139 pub function_name: String,
140 /// from enviroment variable `SCF_FUNCTIONVERSION`
141 pub function_version: String,
142 /// from enviroment variable `TENCENTCLOUD_REGION`
143 pub region: String,
144 /// from enviroment variable `TENCENTCLOUD_APPID`
145 pub appid: String,
146 /// from enviroment variable `TENCENTCLOUD_UIN`
147 pub uin: String,
148}
149
150impl Context {
151 /// Creates a context from values from header and from enviroment (via `env_context`).
152 fn new(
153 memory_limit_in_mb: usize,
154 time_limit_in_ms: usize,
155 request_id: String,
156 env_context: &EnvContext,
157 ) -> Self {
158 Self {
159 memory_limit_in_mb,
160 time_limit_in_ms,
161 request_id,
162 namespace: env_context.namespace.clone(),
163 function_name: env_context.function_name.clone(),
164 function_version: env_context.function_version.clone(),
165 region: env_context.region.clone(),
166 appid: env_context.appid.clone(),
167 uin: env_context.uin.clone(),
168 }
169 }
170}
171
172/// A subset of [`Context`] where the values are pulled from the environment variables
173#[derive(Debug, Clone)]
174struct EnvContext {
175 namespace: String,
176 function_name: String,
177 function_version: String,
178 region: String,
179 appid: String,
180 uin: String,
181}
182
183impl EnvContext {
184 /// Load values from environment variables. The names of the variables are retrieved from [Built-in Environment Variables]
185 ///
186 /// # Panics
187 /// The function will panic if any value is missing.
188 ///
189 /// [Built-in Environment Variables]: https://cloud.tencent.com/document/product/583/30228#.E5.B7.B2.E5.86.85.E7.BD.AE.E7.8E.AF.E5.A2.83.E5.8F.98.E9.87.8F
190 fn load() -> Self {
191 Self {
192 namespace: env::var("SCF_NAMESPACE").unwrap(),
193 function_name: env::var("SCF_FUNCTIONNAME").unwrap(),
194 function_version: env::var("SCF_FUNCTIONVERSION").unwrap(),
195 region: env::var("TENCENTCLOUD_REGION").unwrap(),
196 appid: env::var("TENCENTCLOUD_APPID").unwrap(),
197 uin: env::var("TENCENTCLOUD_UIN").unwrap(),
198 }
199 }
200}
201
202/// Main trait for a serverless compute function.
203///
204/// # Note
205/// Depending on whether the concrete type is unwind-safe, user should choose between [`start`]
206/// and [`start_uncatched`] to start the runtime. This trait **doesn't** imply `RefUnwindSafe`.
207///
208/// # Implement the Trait
209/// Using a closure should cover most of the cases. If a struct/enum is used, the trait can be
210/// implemented as:
211/// ```no_run
212/// use tencent_scf::{start, Context, ServerlessComputeFunction};
213/// struct MyFunction {
214/// some_attribute: String,
215/// }
216///
217/// impl ServerlessComputeFunction for MyFunction {
218/// // Type for input event
219/// type Event = String;
220/// // Type for output response
221/// type Response = String;
222/// // Type for possible error
223/// type Error = std::convert::Infallible;
224///
225/// // Implement the execution of the function
226/// fn call(
227/// &self,
228/// event: Self::Event,
229/// context: Context,
230/// ) -> Result<Self::Response, Self::Error> {
231/// // We just concatenate the strings
232/// Ok(event + &self.some_attribute)
233/// }
234/// }
235///
236/// start(MyFunction {
237/// some_attribute: "suffix".to_string(),
238/// });
239/// ```
240pub trait ServerlessComputeFunction {
241 /// The type for incoming event.
242 type Event;
243 /// The type for outgoing response.
244 type Response;
245 /// The type for error(s) during the execution of the function.
246 type Error;
247
248 /// The actual execution of the function. Implementer is allowed to panic as it will be catched
249 /// by the runtime.
250 fn call(&self, event: Self::Event, context: Context) -> Result<Self::Response, Self::Error>;
251}
252
253/// A wrapper struct to convert a closure into a [`ServerlessComputeFunction`].
254///
255/// The main reason we need this is to make sure we can use associative type in
256/// [`ServerlessComputeFunction`].
257#[doc(hidden)]
258pub struct Closure<Event, Response, Error, Function> {
259 f: Function,
260 phantom: PhantomData<panic::AssertUnwindSafe<(Event, Response, Error)>>,
261}
262
263#[doc(hidden)]
264impl<Event, Response, Error, Function> ServerlessComputeFunction
265 for Closure<Event, Response, Error, Function>
266where
267 Function: Fn(Event, Context) -> Result<Response, Error>,
268{
269 type Event = Event;
270 type Response = Response;
271 type Error = Error;
272 fn call(&self, event: Event, context: Context) -> Result<Response, Error> {
273 (&self.f)(event, context)
274 }
275}
276
277/// Create a [`ServerlessComputeFunction`] from a closure.
278///# Example
279/// ```
280/// # #[cfg(feature = "builtin-api-gateway")]
281/// # {
282/// use std::convert::Infallible;
283///
284/// use tencent_scf::{
285/// builtin::api::{Request, Response, ResponseBuilder},
286/// make_scf, Context,
287/// };
288///
289/// let scf = make_scf(
290/// |event: Request<String>, _context: Context| -> Result<Response<String>, Infallible> {
291/// Ok(ResponseBuilder::new()
292/// .status(200)
293/// .body("Hello World".to_string())
294/// .unwrap())
295/// },
296/// );
297/// # }
298/// ```
299pub fn make_scf<Event, Response, Error, Function>(
300 f: Function,
301) -> Closure<Event, Response, Error, Function>
302where
303 Function: Fn(Event, Context) -> Result<Response, Error> + 'static,
304{
305 Closure {
306 f,
307 phantom: PhantomData,
308 }
309}
310
311/// Start the runtime with the given serverless compute function.
312///
313/// Typically this should be the last call in the `main` function.
314///
315/// # Panic
316/// The runtime will panic if any of the following happens:
317/// * Fail to initialize, for example, expected environment vairable not found.
318/// * Fail to communicate to upstream cloud.
319/// * Receive malformed response from upstream cloud.
320///
321/// The runtime will **not** panic if the serverless compute function panics. Instead, the panic will
322/// be captured and properly sent to the upstream cloud. The runtime may not be able to capture
323/// **all** kinds of panic, see [`std::panic::catch_unwind`] for more information.
324///
325/// # Example
326/// ```no_run
327/// # #[cfg(feature = "json")]
328/// # {
329/// use std::convert::Infallible;
330///
331/// use serde::{Deserialize, Serialize};
332/// use tencent_scf::{convert::AsJson, make_scf, start, Context};
333///
334/// // define a custom event
335/// #[derive(Deserialize)]
336/// struct CustomEvent {
337/// a: i32,
338/// b: i32,
339/// }
340///
341/// // mark the custom event as json for auto deserialization
342/// impl AsJson for CustomEvent {}
343///
344/// // define a custom response
345/// #[derive(Serialize)]
346/// struct CustomResponse {
347/// a_plus_b: i32,
348/// }
349///
350/// // mark the custom response as json for auto serialization
351/// impl AsJson for CustomResponse {}
352///
353/// fn main() {
354/// // make the scf
355/// let scf = make_scf(
356/// |event: CustomEvent, _context: Context| -> Result<CustomResponse, Infallible> {
357/// Ok(CustomResponse {
358/// a_plus_b: event.a + event.b,
359/// })
360/// },
361/// );
362/// // start the runtime in the main function
363/// start(scf);
364/// }
365/// # }
366/// ```
367pub fn start<Event, Response, Error, ConvertEventError, ConvertResponseError, Function>(f: Function)
368where
369 Function: ServerlessComputeFunction<Event = Event, Response = Response, Error = Error>
370 + RefUnwindSafe,
371 Event: convert::FromReader<Error = ConvertEventError>,
372 Response: convert::IntoBytes<Error = ConvertResponseError>,
373 Error: Display,
374 ConvertEventError: Display,
375 ConvertResponseError: Display,
376{
377 let rt = Runtime::new();
378
379 // Notify upstream cloud that we are ready
380 rt.notify_ready();
381
382 // Main loop
383 loop {
384 // Fetch next event
385 if let Some((event, context)) = rt.next() {
386 // The response is well-formed
387 let result = rt.invoke(&f, event, context);
388 // Send the result to the cloud
389 rt.send_result(result);
390 }
391 }
392}
393
394/// Start the runtime with the given serverless compute function without catching panics.
395///
396/// Typically this should be the last call in the `main` function.
397///
398/// # Panic
399/// The runtime will panic if any of the following happens:
400/// * Fail to initialize, for example, expected environment vairable not found.
401/// * Fail to communicate to upstream cloud.
402/// * Receive malformed response from upstream cloud.
403/// * The serverless compute function panics during execution.
404///
405/// # Example
406/// Here is an example where the function must be started without panic catching:
407/// ```no_run
408/// use tencent_scf::{make_scf, start_uncatched, Context};
409/// use ureq::AgentBuilder;
410///
411/// // build an http agent
412/// // this object is not unwind-safe so any closure that captures it is not unwind-safe
413/// let agent = AgentBuilder::new().build();
414/// // make the scf
415/// let scf = make_scf(
416/// move |event: serde_json::Value,
417/// _context: Context|
418/// -> Result<serde_json::Value, ureq::Error> {
419/// // do something using agent
420/// let _resp = agent.get("http://example.com/").call()?;
421/// Ok(event)
422/// },
423/// );
424/// // start the runtime in the main function
425/// start_uncatched(scf);
426/// // this doesn't compile
427/// // tencent_scf::start(scf);
428/// ```
429pub fn start_uncatched<Event, Response, Error, ConvertEventError, ConvertResponseError, Function>(
430 f: Function,
431) where
432 Function: ServerlessComputeFunction<Event = Event, Response = Response, Error = Error>,
433 Event: convert::FromReader<Error = ConvertEventError>,
434 Response: convert::IntoBytes<Error = ConvertResponseError>,
435 Error: Display,
436 ConvertEventError: Display,
437 ConvertResponseError: Display,
438{
439 let rt = Runtime::new();
440
441 // Notify upstream cloud that we are ready
442 rt.notify_ready();
443
444 // Main loop
445 loop {
446 // Fetch next event
447 if let Some((event, context)) = rt.next() {
448 // The response is well-formed
449 let result = rt.invoke_uncatched(&f, event, context);
450 // Send the result to the cloud
451 rt.send_result(result);
452 }
453 }
454}
455
456/// A struct that contains information for a runtime
457struct Runtime {
458 agent: Agent,
459 ready_url: String,
460 next_url: String,
461 response_url: String,
462 error_url: String,
463 env_context: EnvContext,
464}
465
466impl Runtime {
467 /// Create a runtime
468 fn new() -> Self {
469 // HTTP client pool
470 let agent = AgentBuilder::new().build();
471
472 // Extract cloud runtime server and port from environment variables
473 let api_server = env::var("SCF_RUNTIME_API").unwrap();
474 let api_port = env::var("SCF_RUNTIME_API_PORT").unwrap();
475
476 // Assemble urls for upstream cloud communication
477 // Gathered from [Custom Runtime
478 // API](https://cloud.tencent.com/document/product/583/47274#custom-runtime-.E8.BF.90.E8.A1.8C.E6.97.B6-api)
479 let ready_url = format!("http://{}:{}/runtime/init/ready", api_server, api_port);
480 let next_url = format!("http://{}:{}/runtime/invocation/next", api_server, api_port);
481 let response_url = format!(
482 "http://{}:{}/runtime/invocation/response",
483 api_server, api_port
484 );
485 let error_url = format!(
486 "http://{}:{}/runtime/invocation/error",
487 api_server, api_port
488 );
489
490 // Load context from environment variables
491 let env_context = EnvContext::load();
492
493 Self {
494 agent,
495 ready_url,
496 next_url,
497 response_url,
498 error_url,
499 env_context,
500 }
501 }
502
503 /// Notify the upstream cloud that the runtime is ready
504 #[inline]
505 fn notify_ready(&self) {
506 // A space must be sent, otherwise the upstream cloud rejects the request.
507 self.agent
508 .post(&self.ready_url)
509 .send_string(" ")
510 .expect("fail to notify cloud about readiness");
511 }
512
513 /// Get the next event from upstream cloud
514 #[inline]
515 fn next<Event, ConvertError>(&self) -> Option<(Event, Context)>
516 where
517 Event: convert::FromReader<Error = ConvertError>,
518 ConvertError: Display,
519 {
520 let resp = self
521 .agent
522 .get(&self.next_url)
523 .call()
524 .expect("fail to retrieve next event from cloud");
525
526 match self.break_parts(resp) {
527 Ok(parts) => Some(parts),
528 Err(err) => {
529 // Fail to parse the response
530 self.send_error_message(&err);
531 None
532 }
533 }
534 }
535
536 /// Invoke a scf with panic catching
537 fn invoke<Event, Response, Error, ConvertEventError, ConvertResponseError, Function>(
538 &self,
539 f: &Function,
540 event: Event,
541 context: Context,
542 ) -> Result<Response, String>
543 where
544 Function: ServerlessComputeFunction<Event = Event, Response = Response, Error = Error>
545 + RefUnwindSafe,
546 Event: convert::FromReader<Error = ConvertEventError>,
547 Response: convert::IntoBytes<Error = ConvertResponseError>,
548 Error: Display,
549 ConvertEventError: Display,
550 ConvertResponseError: Display,
551 {
552 let invoke_result = {
553 // Replace the panic handler to redirect panic messages.
554 // This is a RAII construct so the panic handler should be reinstated after the
555 // end of this block
556 let panic_guard = helper::PanicGuard::new();
557 let invoke_result = panic::catch_unwind({
558 let scf = &f;
559 // The event was deserialized from a byte stream and *should not* affect the
560 // outer environment had a panic happens, but we cannot prevent crazy
561 // implementater for `convert::FromReader` where they somehow introduce
562 // `!UnwindSafe` types. Implementers are warned in the documentation for
563 // `convert::FromReader` so for ergonomics we assert unwind-safety for
564 // event
565 let event = panic::AssertUnwindSafe(event);
566 move || scf.call(event.0, context)
567 })
568 .map_err(|_| panic_guard.get_panic());
569 invoke_result
570 };
571 match invoke_result {
572 // The execution succeeded
573 Ok(Ok(response)) => Ok(response),
574 Ok(Err(err)) => {
575 // There are errors during the execution
576 Err(format!("function failed with error: {}", err))
577 }
578 Err(message) => {
579 // panic, now extract panic message from buffer
580 Err(message)
581 }
582 }
583 }
584
585 /// Invoke a scf without panic catching
586 fn invoke_uncatched<Event, Response, Error, ConvertEventError, ConvertResponseError, Function>(
587 &self,
588 f: &Function,
589 event: Event,
590 context: Context,
591 ) -> Result<Response, String>
592 where
593 Function: ServerlessComputeFunction<Event = Event, Response = Response, Error = Error>,
594 Event: convert::FromReader<Error = ConvertEventError>,
595 Response: convert::IntoBytes<Error = ConvertResponseError>,
596 Error: Display,
597 ConvertEventError: Display,
598 ConvertResponseError: Display,
599 {
600 f.call(event, context)
601 .map_err(|err| format!("function failed with error: {}", err))
602 }
603
604 /// Send the execution result to the upstream cloud
605 fn send_result<Response, ConvertResponseError>(&self, result: Result<Response, String>)
606 where
607 Response: convert::IntoBytes<Error = ConvertResponseError>,
608 ConvertResponseError: Display,
609 {
610 match result {
611 Ok(response) => match response.into_bytes() {
612 Ok(response) => {
613 // Send the result to the upstream
614 self.agent
615 .post(&self.response_url)
616 .send_bytes(&response)
617 .expect("fail to send response to the cloud");
618 }
619 Err(err) => {
620 // Fail to encode the response
621 self.send_error_message(&format!("fail to encode function response: {}", err));
622 }
623 },
624
625 Err(err) => {
626 self.send_error_message(&err);
627 }
628 }
629 }
630
631 /// Break a response for an event into payload and metadata, and try to parse the payload into
632 /// event and collect metadata to form a context.
633 #[doc(hidden)]
634 fn break_parts<Event, ConvertError>(
635 &self,
636 invocation: Response,
637 ) -> Result<(Event, Context), String>
638 where
639 Event: convert::FromReader<Error = ConvertError>,
640 ConvertError: Display,
641 {
642 // Name of the headers from: [Custom Runtime
643 // API](https://cloud.tencent.com/document/product/583/47274#custom-runtime-.E8.BF.90.E8.A1.8C.E6.97.B6-api)
644 let memory_limit_in_mb = Self::parse_header(&invocation, "memory_limit_in_mb")?;
645 let time_limit_in_ms = Self::parse_header(&invocation, "time_limit_in_ms")?;
646 let request_id = Self::parse_header(&invocation, "request_id")?;
647
648 let reader = invocation.into_reader();
649 let event = Event::from_reader(reader)
650 .map_err(|e| format!("failed to parse incoming invocation payload: {}", e))?;
651
652 Ok((
653 event,
654 Context::new(
655 memory_limit_in_mb,
656 time_limit_in_ms,
657 request_id,
658 &self.env_context,
659 ),
660 ))
661 }
662 /// A helper function to parse a header in the response. Returns an error when the header doesn't
663 /// exist or the parsing fails.
664 #[inline]
665 fn parse_header<T, Error>(response: &Response, header: &str) -> Result<T, String>
666 where
667 T: FromStr<Err = Error>,
668 Error: Display,
669 {
670 match response.header(header) {
671 Some(value) => value.parse().map_err(|e| {
672 format!(
673 "fail to parse value of header {} of incoming invocation: {}",
674 header, e
675 )
676 }),
677 None => Err(format!(
678 "header {} is not present in the incoming invocation",
679 header
680 )),
681 }
682 }
683
684 /// A helper function to send the error message to the upstream cloud
685 #[inline]
686 fn send_error_message(&self, message: &str) {
687 self.agent
688 .post(&self.error_url)
689 .send_string(message)
690 .expect("fail to send error message to the cloud");
691 }
692}