SHORS

Shors - a library for creating transport layer for distributed systems built with tarantool-module.
Shors contains four components:
- http router
- rpc router
- rpc client
- builtin components like:
- middlewares (opentelemetry, metrics, etc)
- logger
HTTP
Create http route
Use a route::Builder for create http routes. After route created just register it with http Server.
Example:
use shors::transport::http::route::Builder;
use shors::transport::http::{server, Request};
use shors::transport::Context;
use std::error::Error;
fn make_http_endpoint() {
let endpoint = Builder::new()
.with_method("GET")
.with_path("/concat/a/:a/b/:b")
.build(
|_ctx: &mut Context, request: Request| -> Result<_, Box<dyn Error>> {
let a = request
.stash
.get("a")
.map(|s| s.as_str())
.unwrap_or_default();
let b = request
.stash
.get("b")
.map(|s| s.as_str())
.unwrap_or_default();
Ok(a.to_string() + b)
},
);
let s = server::Server::new();
s.register(Box::new(endpoint));
}
A more complex example (with groups, error handling, custom and builtin middlewares):
use once_cell::sync::Lazy;
use opentelemetry::sdk::export::trace::stdout;
use opentelemetry::sdk::trace::Tracer;
use shors::transport::http::route::Builder;
use shors::transport::http::{server, Request, Response};
use shors::transport::Context;
use shors::{middleware, shors_error};
use std::error::Error;
static OPENTELEMETRY_TRACER: Lazy<Tracer> = Lazy::new(|| stdout::new_pipeline().install_simple());
pub fn make_http_endpoints() {
let route_group = Builder::new()
.with_path("/v1")
.with_error_handler(|ctx, err| {
shors_error!(ctx: ctx, "http error {}", err);
Response {
status: 501,
body: "request fail".to_string(),
}
})
.with_middleware(|route| {
println!("got new http request!");
route
})
.with_middleware(middleware::http::otel(&OPENTELEMETRY_TRACER))
.group();
#[derive(serde::Deserialize)]
struct EchoRequest {
text: String,
}
let echo = route_group
.builder()
.with_method("POST")
.with_path("/echo")
.build(
|_ctx: &mut Context, request: Request| -> Result<_, Box<dyn Error>> {
let req: EchoRequest = request.parse()?;
Ok(req.text)
},
);
let ping = route_group
.builder()
.with_method("GET")
.with_path("/ping")
.build(|_ctx: &mut Context, _request: Request| -> Result<_, Box<dyn Error>> { Ok("pong") });
let s = server::Server::new();
s.register(Box::new(echo));
s.register(Box::new(ping));
}
RPC
Prepare
Rpc transport required exported stored procedure - rpc_handler.
Create stored procedure. Example (where RPC_SERVER is the Server instance):
#[no_mangle]
pub extern "C" fn rpc_handler(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
RPC_SERVER.with(|srv| srv.handle(ctx, args))
}
And export it from cartridge role. Example:
box.schema.func.create('mylib.rpc_handler', { language = 'C', if_not_exists = true })
rawset(_G, 'rpc_handler', function(path, ctx, ...)
return box.func['mylib.rpc_handler']:call({ path, ctx, { ... } })
end)
Create rpc routes
Working with rpc routes same as http: use route::Builder for creating rpc routes. After route created register it with rpc Server.
Complex example:
use once_cell::unsync::Lazy;
use shors::log::RequestIdOwner;
use shors::transport::rpc::server::Server;
use shors::transport::{rpc, Context};
use std::error::Error;
use shors::{middleware, shors_error};
thread_local! {
pub static RPC_SERVER: Lazy<Server> = Lazy::new(Server::new);
}
#[tarantool::proc]
fn init_rpc() -> tarantool::Result<()> {
let routes = rpc::route::Builder::new()
.with_error_handler(|ctx, err| {
shors_error!(ctx: ctx, "rpc error {}", err);
})
.with_middleware(|route| {
println!("got new rpc request!");
route
})
.with_middleware(middleware::rpc::otel(&OPENTELEMETRY_TRACER))
.group();
let sum_route = routes.builder().with_path("/sum").build(
|_ctx: &mut Context, req: rpc::Request| -> Result<_, Box<dyn Error>> {
let numbers = req.parse::<(Vec<u64>,)>()?.0;
Ok(numbers.into_iter().sum::<u64>())
},
);
RPC_SERVER.with(|srv| {
srv.register(Box::new(sum_route));
});
Ok(())
}
RPC client
There is a special component for interaction with remote rpc endpoints. Currently, client can
call rpc endpoint in four modes:
- by bucket_id (vshard)
- by bucket_id (vshard) async (without waiting for an response)
- by replicaset id (call current master)
- by cartridge role (call current master)
Prepare
Rpc client require register some lua code in luaopen_ function (or in any init function).
Example:
#[no_mangle]
pub unsafe extern "C" fn luaopen_libstub(l: *mut ffi_lua::lua_State) -> c_int {
let lua = tlua::StaticLua::from_static(l);
shors::init_lua_functions(&lua).unwrap();
1
}
Call rpc endpoint by bucket_id
let lua = tarantool::lua_state();
let params = vec![2, 3, 4];
let resp = rpc::client::Builder::new(&lua)
.shard_endpoint("/add")
.call(&mut Context::background(), bucket_id, ¶ms)?;
Call rpc endpoint by bucket_id async
let lua = tarantool::lua_state();
rpc::client::Builder::new(&lua)
.async_shard_endpoint("/ping")
.call(&mut Context::background(), bucket_id, &())?;
Call rpc endpoint by replicaset uuid
let lua = tarantool::lua_state();
let params = vec![2, 3, 4];
let resp = rpc::client::Builder::new(&lua)
.replicaset_endpoint("/add")
.call(&mut Context::background(), rs_uuid, ¶ms)?;
Call rpc endpoint by cartridge role
NOTE: calling rpc endpoint by role require register exported rpc_handler stored procedure as
exported role method. For example:
return {
role_name = 'app.roles.stub',
...
rpc_handler = function(path, ctx, ...)
return box.func['libstub.rpc_handle']:call({ path, ctx, { ... } })
end,
}
Call example:
let lua = tarantool::lua_state();
let resp = rpc::client::Builder::new(&lua)
.role_endpoint("app.roles.stub", "/ping")
.call(&mut Context::background(), &())?;
Builtin middlewares
- http server
- debug - print debug information in debug logs
- otel - opentelemetry tracing
- otel_conditional - opentelemetry tracing, disabled if http-header
with-trace not set
- rpc server
- debug - print debug information in debug logs
- otel - opentelemetry tracing
- record_latency - record route latency as prometheus metric
- rpc client
- otel - opentelemetry tracing
- retry - retry call on server side errors
- record_latency - record call latency as prometheus metric
Testing
Unit
make unit-test
Integration
(cd tests/testapplication/ && cartridge build)
make int-test