shors 0.1.6

Transport layer for cartridge + tarantool-module projects.
Documentation

SHORS

Latest Version Docs badge

Shors - a library for creating transport layer for distributed systems built with tarantool-module. Shors contains four components:

  • http router
  • rpc router
  • rpc client
  • builtin components like:
    • middlewares (opentelemetry, metrics, etc)
    • logger

HTTP

Create http route

Use a route::Builder for create http routes. After route created just register it with http Server.

Example:

use shors::transport::http::route::Builder;
use shors::transport::http::{server, Request};
use shors::transport::Context;
use std::error::Error;

fn make_http_endpoint() {
  let endpoint = Builder::new()
          .with_method("GET")
          .with_path("/concat/a/:a/b/:b")
          .build(
            |_ctx: &mut Context, request: Request| -> Result<_, Box<dyn Error>> {
              let a = request
                      .stash
                      .get("a")
                      .map(|s| s.as_str())
                      .unwrap_or_default();
              let b = request
                      .stash
                      .get("b")
                      .map(|s| s.as_str())
                      .unwrap_or_default();

              Ok(a.to_string() + b)
            },
          );

  let s = server::Server::new();
  s.register(Box::new(endpoint));
}

A more complex example (with groups, error handling, custom and builtin middlewares):

use once_cell::sync::Lazy;
use opentelemetry::sdk::export::trace::stdout;
use opentelemetry::sdk::trace::Tracer;
use shors::transport::http::route::Builder;
use shors::transport::http::{server, Request, Response};
use shors::transport::Context;
use shors::{middleware, shors_error};
use std::error::Error;

static OPENTELEMETRY_TRACER: Lazy<Tracer> = Lazy::new(|| stdout::new_pipeline().install_simple());

pub fn make_http_endpoints() {
  let route_group = Builder::new()
          .with_path("/v1")
          .with_error_handler(|ctx, err| {
            shors_error!(ctx: ctx, "http error {}", err);
            Response {
              status: 501,
              body: "request fail".to_string(),
            }
          })
          .with_middleware(|route| {
            println!("got new http request!");
            route
          })
          .with_middleware(middleware::http::otel(&OPENTELEMETRY_TRACER))
          .group();

  #[derive(serde::Deserialize)]
  struct EchoRequest {
    text: String,
  }

  let echo = route_group
          .builder()
          .with_method("POST")
          .with_path("/echo")
          .build(
            |_ctx: &mut Context, request: Request| -> Result<_, Box<dyn Error>> {
              let req: EchoRequest = request.parse()?;
              Ok(req.text)
            },
          );

  let ping = route_group
          .builder()
          .with_method("GET")
          .with_path("/ping")
          .build(|_ctx: &mut Context, _request: Request| -> Result<_, Box<dyn Error>> { Ok("pong") });

  let s = server::Server::new();
  s.register(Box::new(echo));
  s.register(Box::new(ping));
}

RPC

Prepare

Rpc transport required exported stored procedure - rpc_handler.

Create stored procedure. Example (where RPC_SERVER is the Server instance):

#[no_mangle]
pub extern "C" fn rpc_handler(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
    RPC_SERVER.with(|srv| srv.handle(ctx, args))
}

And export it from cartridge role. Example:

    box.schema.func.create('mylib.rpc_handler', { language = 'C', if_not_exists = true })
    rawset(_G, 'rpc_handler', function(path, ctx, ...)
        return box.func['mylib.rpc_handler']:call({ path, ctx, { ... } })
    end)

Create rpc routes

Working with rpc routes same as http: use route::Builder for creating rpc routes. After route created register it with rpc Server.

Complex example:

use once_cell::unsync::Lazy;
use shors::log::RequestIdOwner;
use shors::transport::rpc::server::Server;
use shors::transport::{rpc, Context};
use std::error::Error;
use shors::{middleware, shors_error};

thread_local! {
    pub static RPC_SERVER: Lazy<Server> = Lazy::new(Server::new);
}

#[tarantool::proc]
fn init_rpc() -> tarantool::Result<()> {
  let routes = rpc::route::Builder::new()
          .with_error_handler(|ctx, err| {
            shors_error!(ctx: ctx, "rpc error {}", err);
          })
          .with_middleware(|route| {
            println!("got new rpc request!");
            route
          })
          .with_middleware(middleware::rpc::otel(&OPENTELEMETRY_TRACER))
          .group();
  
  let sum_route = routes.builder().with_path("/sum").build(
    |_ctx: &mut Context, req: rpc::Request| -> Result<_, Box<dyn Error>> {
      let numbers = req.parse::<(Vec<u64>,)>()?.0;
      Ok(numbers.into_iter().sum::<u64>())
    },
  );

  RPC_SERVER.with(|srv| {
    srv.register(Box::new(sum_route));
  });

  Ok(())
}

RPC client

There is a special component for interaction with remote rpc endpoints. Currently, client can call rpc endpoint in four modes:

  • by bucket_id (vshard)
  • by bucket_id (vshard) async (without waiting for an response)
  • by replicaset id (call current master)
  • by cartridge role (call current master)

Prepare

Rpc client require register some lua code in luaopen_ function (or in any init function).

Example:

#[no_mangle]
pub unsafe extern "C" fn luaopen_libstub(l: *mut ffi_lua::lua_State) -> c_int {
    let lua = tlua::StaticLua::from_static(l);
    shors::init_lua_functions(&lua).unwrap();
    1
}

Call rpc endpoint by bucket_id

    let lua = tarantool::lua_state();

    let params = vec![2, 3, 4];
    let resp = rpc::client::Builder::new(&lua)
        .shard_endpoint("/add")
        .call(&mut Context::background(), bucket_id, &params)?;

Call rpc endpoint by bucket_id async

    let lua = tarantool::lua_state();

    rpc::client::Builder::new(&lua)
        .async_shard_endpoint("/ping")
        .call(&mut Context::background(), bucket_id, &())?;

Call rpc endpoint by replicaset uuid

    let lua = tarantool::lua_state();

    let params = vec![2, 3, 4];
    let resp = rpc::client::Builder::new(&lua)
        .replicaset_endpoint("/add")
        .call(&mut Context::background(), rs_uuid, &params)?;

Call rpc endpoint by cartridge role

NOTE: calling rpc endpoint by role require register exported rpc_handler stored procedure as exported role method. For example:

return {
    role_name = 'app.roles.stub',
    ...
    rpc_handler = function(path, ctx, ...)
        return box.func['libstub.rpc_handle']:call({ path, ctx, { ... } })
    end,
}

Call example:

    let lua = tarantool::lua_state();

    let resp = rpc::client::Builder::new(&lua)
      .role_endpoint("app.roles.stub", "/ping")
      .call(&mut Context::background(), &())?;

Builtin middlewares

  • http server
    • debug - print debug information in debug logs
    • otel - opentelemetry tracing
    • otel_conditional - opentelemetry tracing, disabled if http-header with-trace not set
  • rpc server
    • debug - print debug information in debug logs
    • otel - opentelemetry tracing
    • record_latency - record route latency as prometheus metric
  • rpc client
    • otel - opentelemetry tracing
    • retry - retry call on server side errors
    • record_latency - record call latency as prometheus metric

Testing

Unit

  make unit-test

Integration

  (cd tests/testapplication/ && cartridge build)
  make int-test