# `streex`  [](https://nogithub.codeberg.page)
A kafka store service.
Reads a topic and hosts in memory all its items, each item as last updated, and
serves its items for being fetched by id.
This works similarly to how kafka stores work, by having a consumer group on
the input topic and ingesting the elements in memory, while also being capable
of recovering from restarts by writing also into a changelog topic.
The changelog topic is a uniquely reserved topic for this store, and the store
will also have a unique id. Both specified by the user via configuration.
The in memory store is eventually consistent between write flush operations,
however the changelog topic is always kept up to date and written even if the
flush did not happen. The flush opeartion only affects in memory data.
## Usage
### Key & Value
Key and Values are intended to be `TryFrom<Vec<u8>>` and `Into<Vec<u8>>`.
In case of the stantalone web service they will also need to be serde
`Serialize` and `Deserialize`.
The store accepts 2 distinct Value types if they are different and need
conversion. The value of the store needs to implement `From<X>` where X is
the value type of the topic. See types in [docs](https://docs.rs/streex).
### Configuration
The minimal configuration needed is to specify the store id and the kafka
variables. An example is, either by env or .env:
```sh
STORE_ID="test"
SOURCE_TOPIC="test-input"
CHANGELOG_TOPIC="test-changelog"
KAFKA_BROKERS="localhost:9092"
```
### Launch the store
This will launch a store and the http endpoint for getting its items.
Assuming the types `Key` and `Value` defined by the user:
```rust
#[tokio::main]
async fn main() {
streex::SingleStoreApp::<Key, Value>::new()
.await.start()
.await.unwrap();
}
```
Assuming the configuration shown before, endpoints published will be:
- `GET` `http://localhost:8080/stores/test/<key>` to get the value for key.
- `POST` `http://localhost:8080/flush-store/test` to flush the write queue.
## Advanced configuration
### Initial capacity
The indexmap is a structure that keeps its buckets contiguous and the store
will use swap_remove instructions to keep it like that. The last items of the
bucket array will be used to fill in back deleted items. The initial capacity
of the map is 0 so it will cause numerous allocation at the beginning or when
restarting from the changelog.
To avoid that, the map can also be pre allocated with an initial size with:
```sh
INITIAL_CAPACITY=5000000
```
### Auto flush
The store uses internally an [IndexMap](https://docs.rs/indexmap) and the
algorythm used to synchronize writes and reads is
[left_right](https://docs.rs/left-right).
This means that the store will contain 2 indexmaps that will be swapped when a
flush happens. The flush can be requested with the endpoint or configured with
the autoflush variables. The values shown here are their default:
```sh
AUTO_FLUSH_COUNT=1000
AUTO_FLUSH_SECONDS=1
```
### Reader pool
The readers of the map have a limited pool. This comes down to the nature of
mixing them with async/web runtimes. The reader themselves are lock-free and
they are not affected by the write operations but they are acquired from a
synchronized pool.
The size of the pool determines how many concurrent reads can happen.
Defaults to 1000. It can be configured as:
```sh
READER_POOL_SIZE=1000
```
### librdkafka
The store uses librdkafka internally from [rdkafka](https://docs.rs/rdkafka).
There are 3 instances of kafka clients: `consumer`, `producer` and `changelog`.
Each of them can be configured additionally with they key/value
[properties](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
of librdkafka.
The instances are:
- `consumer`: used to consume the input topic.
- `producer`: used to write into the changelog.
- `changelog`: used to consume the changelog at bootstrap.
Example configuration:
```sh
KAFKA_CONSUMER_PROPERTIES=receive.message.max.bytes=10000
KAFKA_PRODUCER_PROPERTIES=client.id=myclient,message.max.bytes=100000
KAFKA_CHANGELOG_PROPERTIES=max.in.flight=1000
```
### Internal buffer
The internal channel used to queue the write operations has a size of 1. For
most situations this is fine as writes are queued in their own oplog anyway,
but it could be useful to impact on some topic burst spikes to increase this
value.
Keep in mind however that this buffer is **destroyed on shutdow** so if you
required reliability and not lose any message, leave it to the default of 1.
```sh
INNER_BUFFER=1
```
### Manual assign of partitions
The store can be configured to only listen to specific partitions of the source
topic, and along with that, only manage those partitions also in the changelog.
This allows to have multiple instances of the store and split them by the list
of their partitions. The default for the store is to always listen to all
partitions, however a list of them can be configured as:
```sh
PARTITIONS=0,1,2,3...
```