Expand description
§Kafka store service
Hosts a store similarly to the kafka stores used in kafka streams. However no kafka streams are available in this library: it is only meant to have a queriable store available as a service.
It still writes to the changelog just the same, and will recover from it between restarts.
§Key & Value
Key and Values are intended to be TryFrom<Vec<u8>> and Into<Vec<u8>>.
For examples, see test_protos module.
In case of the stantalone web service they will also need to be serde.
The store accepts 2 distinct Value types if they are different and need
conversion. The value of the store needs to implement From<X> where X is
the value type of the topic. See types in Store.
§STORE
When using the SingleStoreApp, these environment variables are read and
passed as StoreConfig to the Store.
These are only read if using SingleStoreApp. Creating a Store
requires otherwise to pass these manually at creation.
This prevents clap to take over CLI control over
arguments of your application, which will happen only when using
SingleStoreApp
Variables:
- STORE_ID: The id of this store, must be unique and URL safe.
- SOURCE_TOPIC: The topic from which the store gets its main data.
- CHANGELOG_TOPIC: This is the topic used to recover from restarts.
- KAFKA_BROKERS: comma separated list of kafka brokers.
- INITIAL_CAPACITY: how many items to prealloacte at boot, reduces unnecessary allocations
- PARTITIONS: comma separated list of partitions to consume, empty for all.
- USE_PROCESSING_ONLY: (true|false), default false. When this is true the instance will only process messages into the changelog without adding items to the map in memory. The map is always empty. This method does not trigger proper callbacks because of that.
- PARTITIONS_REPLICA: comma separated list of partitions to serve as copy. These partitions are replicas and are read only from changelog, and not contribute to writing back to it.
- USE_REPLICA_ONLY: (true|false), default false. When this is true the instance will act only as a replica and do not spawn any main consumer or write into the changelog, only read from it.
- WAIT_FOR_REPLICAS: (true|false), default false. When this is true the replica rebuild from the changelog will block and wait at bootstrap before serving requests, otherwise the endpoints will start serving even with an incomplete (still building) replica partition(s).
§WEB
When using SingleStoreApp the setup has these additional parts:
-
The items can be queried directly in the endpoint
GET ...:8080/stores/(store_id)/(key) -
Flushing the store can be issued with
POST ...:8080/flush-store/(store_id) -
Metrics are available at
/metrics/prometheusand there is a simple OK response at/status/livenessto check if server is up.
For a quick list of env variables, the web can be configured like:
- SERVER_BIND: default (0.0.0.0) bind network for which to listen on
- SERVER_PORT: (default 8080) port for which to listen on
- STRUCTURED_LOGGING: true|false (default false)
- SENTRY_URL: url inclusive of key for sending telemetry to sentry
More can be found in the velvet_web crate,
for example TLS setup.
§Types
If only an in memory store is needed without any web hosting, then the Store can be used, with types required StoreKey and StoreType.
If the standalone web service is used (SingleStoreApp) then the key and value types will need more traits: AppKey and AppValue.
A simple store application that has String for Key & Value(s):
use streex::prelude::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let app = SingleStoreApp::<String, String>::new().await;
let store = app.store_handle();
// Do other stuff with store, it is clone/send/sync.
// ...
app.start().await?;
Ok(())
}§GRPC
GRPC will require more code as the server is not handled by this library.
An example on how to build a composition of SingleStoreApp and GRPC
using tonic would look like:
use tonic::{Status, transport::Server};
use streex::prelude::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use your_protolib::Key as K;
use your_protolib::Value as V;
// The trait that represents the grpc service
use your_protolib::store_server::Store as ProtoStore;
// The partial implementation GRPC generates for a service
use your_protolib::store_server::StoreServer as ProtoServer;
let app = SingleStoreApp::<K, V>::new().await;
// Get a handle to the store so can call methods on it directly.
// This store is cloneable as it only contains references.
let store = app.store_handle();
struct MyKVServer {
store: Store<K, V>,
}
#[tonic::async_trait]
impl ProtoStore for MyKVServer {
async fn get(
&self,
req: tonic::Request<K>,
) -> tonic::Result<tonic::Response<V>, Status> {
let Ok(Some(val)) = self.store.get(req.get_ref()).await else {
return Err(Status::not_found(""));
};
Ok(tonic::Response::new(val.clone()))
}
}
let svc = ProtoServer::new(MyKVServer {
store: store.clone(),
});
let addr = "[::]:50051".parse()?;
let h = tokio::spawn(Server::builder().add_service(svc).serve(addr));
app.start().await?;
h.await??;
Ok(())
}
Modules§
Structs§
- Read
Handle - This is a stable handle that can be used to access in read mode to the map. In case it is not desired to use the read pool, which would have to acquire a handle from a pool of mutex, this handle will remain stable to whoever requests it and take advantage of the lock-free of left-right.
- Single
Store App - This is the most common way to create and host a store.
- Store
- Kafka Store
- Store
Callbacks - Optional callback setup.
- Store
Config - Configuration used to create a Store.
Traits§
- AppKey
- AppValue
- OnStore
Delete - Callback for item deletion
- OnStore
New - Callback for item insertion, when the item is new
- OnStore
Update - Callback on store item update.
- Store
Key - This is the type required for the key of the store. It extends the StoreType.
- Store
Type - This is the type for both key and value for the store, and defines the required traits for them.