ember-client
async Rust client for ember, a high-performance distributed cache.
features
- typed API for all common commands (strings, keys, lists, hashes, sets, sorted sets, server, pub/sub, slowlog)
- pipelining — batch commands into a single round-trip
- optional TLS via the
tls cargo feature (enabled by default)
- optional vector commands via the
vector cargo feature
- binary-safe values: inputs accept
&str, String, Vec<u8>, or bytes::Bytes
- outputs return
Bytes — zero-copy where possible
Subscriber type for pub/sub with typed message frames
quick start
[dependencies]
ember-client = "0.1"
tokio = { version = "1", features = ["full"] }
use ember_client::Client;
#[tokio::main]
async fn main() -> Result<(), ember_client::ClientError> {
let mut client = Client::connect("127.0.0.1", 6379).await?;
client.set("greeting", "hello").await?;
if let Some(value) = client.get("greeting").await? {
println!("{}", String::from_utf8_lossy(&value));
}
Ok(())
}
pipelining
Send multiple commands in a single network round-trip:
use ember_client::{Client, Pipeline};
#[tokio::main]
async fn main() -> Result<(), ember_client::ClientError> {
let mut client = Client::connect("127.0.0.1", 6379).await?;
let frames = client.execute_pipeline(
Pipeline::new()
.set("a", "1")
.set("b", "2")
.get("a")
.get("b")
.incr("counter")
.zadd("scores", &[(9.5, "alice"), (8.0, "bob")])
.hset("user:1", &[("name", "alice"), ("age", "30")]),
).await?;
println!("{} responses", frames.len());
Ok(())
}
commands
strings
| command |
signature |
returns |
get |
get(key) |
Option<Bytes> |
set |
set(key, value) |
() |
set_ex |
set_ex(key, value, seconds) |
() |
getdel |
getdel(key) |
Option<Bytes> |
getset |
getset(key, value) |
Option<Bytes> (old value) |
append |
append(key, value) |
i64 (new length) |
strlen |
strlen(key) |
i64 |
incr |
incr(key) |
i64 |
decr |
decr(key) |
i64 |
incrby |
incrby(key, delta) |
i64 |
decrby |
decrby(key, delta) |
i64 |
incr_by_float |
incr_by_float(key, delta) |
f64 |
mget |
mget(&[key, ...]) |
Vec<Option<Bytes>> |
mset |
mset(&[(key, value), ...]) |
() |
msetnx |
msetnx(&[(key, value), ...]) |
bool (false if any key exists) |
keys
| command |
signature |
returns |
del |
del(&[key, ...]) |
i64 (count deleted) |
unlink |
unlink(&[key, ...]) |
i64 |
exists |
exists(&[key, ...]) |
i64 (count found) |
expire |
expire(key, seconds) |
bool |
pexpire |
pexpire(key, millis) |
bool |
expireat |
expireat(key, timestamp) |
bool |
pexpireat |
pexpireat(key, timestamp_ms) |
bool |
persist |
persist(key) |
bool |
ttl |
ttl(key) |
i64 (-2 missing, -1 no expiry) |
pttl |
pttl(key) |
i64 |
expiretime |
expiretime(key) |
i64 (unix seconds, -1 or -2 if no expiry/missing) |
pexpiretime |
pexpiretime(key) |
i64 (unix millis) |
key_type |
key_type(key) |
String |
keys |
keys(pattern) |
Vec<Bytes> |
rename |
rename(key, newkey) |
() |
scan |
scan(cursor, match, count) |
ScanPage |
lists
| command |
signature |
returns |
lpush |
lpush(key, &[value, ...]) |
i64 (new length) |
rpush |
rpush(key, &[value, ...]) |
i64 |
lpop |
lpop(key) |
Option<Bytes> |
rpop |
rpop(key) |
Option<Bytes> |
lrange |
lrange(key, start, stop) |
Vec<Bytes> |
llen |
llen(key) |
i64 |
lmpop |
lmpop(&[key, ...], left, count) |
Option<(String, Vec<Bytes>)> |
hashes
| command |
signature |
returns |
hset |
hset(key, &[(field, value), ...]) |
i64 (new fields added) |
hget |
hget(key, field) |
Option<Bytes> |
hgetall |
hgetall(key) |
Vec<(Bytes, Bytes)> |
hmget |
hmget(key, &[field, ...]) |
Vec<Option<Bytes>> |
hdel |
hdel(key, &[field, ...]) |
i64 |
hexists |
hexists(key, field) |
bool |
hlen |
hlen(key) |
i64 |
hincrby |
hincrby(key, field, delta) |
i64 |
hkeys |
hkeys(key) |
Vec<Bytes> |
hvals |
hvals(key) |
Vec<Bytes> |
hrandfield |
hrandfield(key, count) |
Vec<Bytes> |
hrandfield_withvalues |
hrandfield_withvalues(key, count) |
Vec<(Bytes, Bytes)> |
sets
| command |
signature |
returns |
sadd |
sadd(key, &[member, ...]) |
i64 (new members) |
srem |
srem(key, &[member, ...]) |
i64 |
smembers |
smembers(key) |
Vec<Bytes> |
sismember |
sismember(key, member) |
bool |
scard |
scard(key) |
i64 |
smove |
smove(src, dst, member) |
bool |
sintercard |
sintercard(&[key, ...], limit) |
i64 (0 = no limit) |
sorted sets
| command |
signature |
returns |
zadd |
zadd(key, &[(score, member), ...]) |
i64 (new members) |
zrange |
zrange(key, start, stop) |
Vec<Bytes> |
zrange_withscores |
zrange_withscores(key, start, stop) |
Vec<(Bytes, f64)> |
zscore |
zscore(key, member) |
Option<f64> |
zrank |
zrank(key, member) |
Option<i64> |
zrem |
zrem(key, &[member, ...]) |
i64 |
zcard |
zcard(key) |
i64 |
zmpop |
zmpop(&[key, ...], min, count) |
Option<(String, Vec<(Bytes, f64)>)> |
zrandmember |
zrandmember(key, count) |
Vec<Bytes> |
zrandmember_withscores |
zrandmember_withscores(key, count) |
Vec<(Bytes, f64)> |
bitmaps
| command |
signature |
returns |
getbit |
getbit(key, offset) |
i64 (0 or 1) |
setbit |
setbit(key, offset, value) |
i64 (old bit value) |
bitcount |
bitcount(key, range) |
i64 (range is Option<(i64, i64, &str)> where unit is "BYTE" or "BIT") |
bitpos |
bitpos(key, bit, range) |
i64 (position of first matching bit) |
bitop |
bitop(op, dest, &[key, ...]) |
i64 (length of result string) |
server
| command |
signature |
returns |
ping |
ping() |
() |
echo |
echo(message) |
Bytes |
dbsize |
dbsize() |
i64 |
flushdb |
flushdb() |
() |
info |
info(section) |
String |
bgsave |
bgsave() |
String |
bgrewriteaof |
bgrewriteaof() |
String |
slowlog
| command |
signature |
returns |
slowlog_get |
slowlog_get(count) |
Vec<SlowlogEntry> |
slowlog_len |
slowlog_len() |
i64 |
slowlog_reset |
slowlog_reset() |
() |
pub/sub
| command |
signature |
returns |
publish |
publish(channel, message) |
i64 (receivers) |
pubsub_channels |
pubsub_channels(pattern) |
Vec<Bytes> |
pubsub_numsub |
pubsub_numsub(&[channel, ...]) |
Vec<(Bytes, i64)> |
pubsub_numpat |
pubsub_numpat() |
i64 |
subscribe |
subscribe(&[channel, ...]) — consumes self |
Subscriber |
psubscribe |
psubscribe(&[pattern, ...]) — consumes self |
Subscriber |
raw
client.send(&["COMMAND", "arg1", "arg2"]).await?;
pub/sub
subscribe and psubscribe consume the Client, returning a dedicated Subscriber handle. This prevents accidentally mixing pub/sub and regular commands on the same connection.
use ember_client::{Client, Message};
let client = Client::connect("127.0.0.1", 6379).await?;
let mut sub = client.subscribe(&["events", "alerts"]).await?;
loop {
match sub.recv().await? {
Some(Message { channel, payload, .. }) => {
println!("[{}] {}", channel, String::from_utf8_lossy(&payload));
}
None => break,
}
}
Pattern subscriptions work the same way via psubscribe. The Message type also carries the pattern field when using pattern subscriptions.
vector commands
Requires the vector cargo feature:
ember-client = { version = "0.1", features = ["vector"] }
| command |
signature |
returns |
vadd |
vadd(key, element, vector) |
bool |
vadd_batch |
vadd_batch(key, &[(element, vector), ...]) |
i64 |
vsim |
vsim(key, query_vector, count) |
Vec<SimResult> |
vrem |
vrem(key, element) |
bool |
vget |
vget(key, element) |
Option<Vec<f32>> |
vcard |
vcard(key) |
i64 |
vdim |
vdim(key) |
i64 |
vinfo |
vinfo(key) |
Vec<(Bytes, Bytes)> |
use ember_client::Client;
let mut client = Client::connect("127.0.0.1", 6379).await?;
client.vadd("embeddings", "doc-1", &[0.1, 0.9, 0.2]).await?;
client.vadd("embeddings", "doc-2", &[0.8, 0.1, 0.5]).await?;
let results = client.vsim("embeddings", &[0.1, 0.8, 0.3], 5).await?;
for r in results {
println!("{}: {:.4}", r.element, r.score);
}
public types
ScanPage
pub struct ScanPage {
pub cursor: u64, pub keys: Vec<Bytes>,
}
SlowlogEntry
pub struct SlowlogEntry {
pub id: i64,
pub timestamp: i64,
pub duration_us: i64,
pub args: Vec<Bytes>,
}
Message
pub struct Message {
pub kind: String, pub pattern: Option<Bytes>,
pub channel: Bytes,
pub payload: Bytes,
}
SimResult
pub struct SimResult {
pub element: String,
pub score: f32,
}
pipeline builder
All pipeline methods mirror the async API. The builder is chainable and zero-alloc until execute_pipeline is called.
| method |
notes |
get(key) |
|
set(key, value) |
|
del(&[key, ...]) |
|
exists(&[key, ...]) |
|
expire(key, seconds) |
|
pexpire(key, millis) |
|
ttl(key) |
|
incr(key) |
|
decr(key) |
|
incrby(key, delta) |
|
strlen(key) |
|
incr_by_float(key, delta) |
|
unlink(&[key, ...]) |
|
rename(key, newkey) |
|
key_type(key) |
|
keys(pattern) |
|
echo(message) |
|
lpush(key, &[value, ...]) |
|
rpush(key, &[value, ...]) |
|
lpop(key) |
|
rpop(key) |
|
llen(key) |
|
hget(key, field) |
|
hset(key, &[(field, value), ...]) |
|
hdel(key, &[field, ...]) |
|
hincrby(key, field, delta) |
|
hmget(key, &[field, ...]) |
|
sadd(key, &[member, ...]) |
|
srem(key, &[member, ...]) |
|
scard(key) |
|
zadd(key, &[(score, member), ...]) |
|
zcard(key) |
|
zscore(key, member) |
|
publish(channel, message) |
|
ping() |
|
send(&[arg, ...]) |
raw escape hatch |
errors
ClientError covers six cases:
| variant |
meaning |
Io |
TCP-level failure |
Protocol |
unexpected RESP3 frame shape |
Server |
server returned an error reply (WRONGTYPE, NOAUTH, etc.) |
Disconnected |
server closed the connection |
Timeout |
connect or read timed out (5 s / 10 s defaults) |
tls
ember-client = { version = "0.1", features = ["tls"] }
use ember_client::{Client, tls::TlsClientConfig};
let tls = TlsClientConfig::default(); let mut client = Client::connect_tls("my-ember-host", 6380, &tls).await?;