Synap Rust SDK
Official Rust client library for Synap - High-Performance In-Memory Key-Value Store & Message Broker.
Features
- ๐พ Key-Value Store: Fast async KV operations with TTL support
- ๐จ Message Queues: RabbitMQ-style queues with ACK/NACK + reactive consumption
- ๐ก Event Streams: Kafka-style event streams reactive by default ๐ฅ
- ๐ Pub/Sub: Topic-based messaging reactive by default ๐ฅ
- ๐ Reactive Patterns:
futures::Streamfor event-driven consumption - ๐งพ Transactions: MULTI/EXEC/WATCH/DISCARD helpers
- ๐ Lua Scripting: EVAL/EVALSHA with SHA1 caching
- ๐ข HyperLogLog: Cardinality estimation (PFADD/PFCOUNT/PFMERGE)
- โก StreamableHTTP Protocol: Single unified endpoint for all operations
- ๐ก๏ธ Type-Safe: Leverages Rust's type system for correctness
- ๐ฆ Async/Await: Built on Tokio for high-performance async I/O
Installation
Add this to your Cargo.toml:
[]
= "0.1"
= { = "1", = ["full"] }
Quick Start
use ;
use json;
async
Transports
Since v0.11.0 the SDK selects the transport via URL scheme โ no separate builder options required:
| URL scheme | Default port | When to use |
|---|---|---|
synap:// |
15501 |
โ
Recommended default โ MessagePack over persistent TCP, lowest latency, preserves int/float/bool/bytes on the wire. |
resp3:// |
6379 |
Redis-compatible text protocol โ interop with existing Redis tooling. |
http:// / https:// |
15500 |
Original REST transport โ full command coverage. |
All commands (KV, Hash, List, Set, Sorted Set, Queue, Stream, Pub/Sub, Transactions, Scripts, Geo, HyperLogLog) are fully supported on every transport. Native transports raise SynapError::UnsupportedCommand instead of silently falling back to HTTP.
use ;
// SynapRPC โ recommended default
let cfg = new;
let client = new?;
// RESP3 โ Redis-compatible
let cfg = new;
let client = new?;
// HTTP โ full REST access
let cfg = new;
let client = new?;
Queue, stream and pub/sub over synap://:
// Queue round-trip
client.queue.create_queue.await?;
let id = client.queue.publish.await?;
let msg = client.queue.consume.await?;
client.queue.ack.await?;
// Stream publish + read
client.stream.create_room.await?;
client.stream.publish.await?;
let events = client.stream.read.await?;
// Reactive pub/sub (server-push over dedicated TCP connection)
client.pubsub.subscribe.await?;
client.pubsub.publish.await?;
End-to-end test suite
A real-server E2E suite covers all three transports plus cross-transport consistency (write via one, read via the others):
API Reference
Key-Value Store
// Set a value
client.kv.set.await?;
client.kv.set.await?; // with TTL
// Get a value
let value: = client.kv.get.await?;
let number: = client.kv.get.await?;
// Delete a key
client.kv.delete.await?;
// Check existence
let exists = client.kv.exists.await?;
// Atomic operations
let new_value = client.kv.incr.await?;
let new_value = client.kv.decr.await?;
// Get statistics
let stats = client.kv.stats.await?;
info!;
Message Queues
// Create a queue
client.queue.create_queue.await?;
// Publish a message
let msg_id = client.queue.publish.await?;
// Consume a message
let message = client.queue.consume.await?;
if let Some = message
// Get queue stats
let stats = client.queue.stats.await?;
info!;
// List all queues
let queues = client.queue.list.await?;
// Delete a queue
client.queue.delete_queue.await?;
Event Streams (Reactive by Default)
Event streams are reactive by default - use observe_events() or observe_event() for continuous event consumption.
use StreamExt;
use Duration;
// Create a stream room
client.stream.create_room.await?;
// Publish an event
let offset = client.stream.publish.await?;
// โจ Reactive: Observe ALL events
let = client.stream
.observe_events;
spawn;
// โจ Reactive: Observe SPECIFIC event type
let = client.stream
.observe_event;
while let Some = messages.next.await
// Stop observing
handle.unsubscribe;
handle2.unsubscribe;
// Get room stats
let stats = client.stream.stats.await?;
// List all rooms
let rooms = client.stream.list.await?;
// Delete a room
client.stream.delete_room.await?;
Pub/Sub (Reactive by Default)
Pub/Sub is reactive by default - use subscribe() for event-driven message consumption.
use HashMap;
// Publish to a topic
let delivered_count = client.pubsub.publish.await?;
// โจ Subscribe to topics (with wildcards) - REST API
let sub_id = client.pubsub.subscribe_topics.await?;
// โจ Reactive: Observe messages via WebSocket (recommended)
use StreamExt;
let = client.pubsub
.observe;
spawn;
// Stop observing
handle.unsubscribe;
// Or observe a single topic
let = client.pubsub
.observe_topic;
// Unsubscribe (REST API)
client.pubsub.unsubscribe.await?;
// List active topics
let topics = client.pubsub.list_topics.await?;
Configuration
use SynapConfig;
use Duration;
let config = new
.with_timeout
.with_auth_token
.with_max_retries;
let client = new?;
Error Handling
use SynapError;
match client.kv..await
Reactive Programming (RxJS-style)
The SDK now includes RxJS-style reactive patterns via the rx module:
use ;
// Observable with operators (like RxJS pipe)
let obs = from_stream;
obs.filter
.map
.take
.subscribe_next;
// Subject for multicasting
let subject = new;
subject.subscribe;
subject.subscribe;
subject.next; // Both subscribers receive it
See src/rx/README.md for complete guide.
Examples
See the examples/ directory for more examples:
basic.rs- Basic KV operationsqueue.rs- Task queue pattern (traditional)reactive_queue.rs- Reactive queue consumption ๐ฅstream.rs- Event stream (traditional)reactive_stream.rs- Reactive event consumption ๐ฅpubsub.rs- Pub/Sub messagingreactive_pubsub.rs- Reactive Pub/Sub subscriptions ๐ฅrxjs_style.rs- RxJS-style patterns โญ NEW
Run an example:
Testing
# Run tests (requires Synap server running on localhost:15500)
# Or use a custom server URL
SYNAP_URL=http://localhost:15500
License
MIT License - See LICENSE for details.