# `streex` [](https://crates.io/crates/streex) [](https://nogithub.codeberg.page)
A kafka store service.
Reads a topic and hosts in memory all its items, each item as last updated, and
serves its items for being fetched by id.
This works similarly to how kafka stores work, by having a consumer group on
the input topic and ingesting the elements in memory, while also being capable
of recovering from restarts by writing also into a changelog topic.
The changelog topic is a uniquely reserved topic for this store, and the store
will also have a unique id. Both specified by the user via configuration.
The in memory store is eventually consistent between write flush operations,
however the changelog topic is always kept up to date and written even if the
flush did not happen. The flush opeartion only affects in memory data.
## Usage
### Key & Value
Key and Values are intended to be `TryFrom<Vec<u8>>` and `Into<Vec<u8>>`.
In case of the stantalone web service they will also need to be serde
`Serialize` and `Deserialize`.
The store accepts 2 distinct Value types if they are different and need
conversion. The value of the store needs to implement `From<X>` where X is
the value type of the topic. See types in [docs](https://docs.rs/streex).
### Configuration
The minimal configuration needed is to specify the store id and the kafka
variables. An example is, either by env or .env:
```sh
STORE_ID="test"
SOURCE_TOPIC="test-input"
CHANGELOG_TOPIC="test-changelog"
KAFKA_BROKERS="localhost:9092"
```
### Launch the store
This will launch a store and the http endpoint for getting its items.
Assuming the types `Key` and `Value` defined by the user:
```rust
#[tokio::main]
async fn main() {
streex::SingleStoreApp::<Key, Value>::new()
.await.start()
.await.unwrap();
}
```
Assuming the configuration shown before, endpoints published will be:
- `GET` `http://localhost:8080/stores/test/<key>` to get the value for key.
- `POST` `http://localhost:8080/flush-store/test` to flush the write queue.
## Advanced configuration
### Initial capacity
The indexmap is a structure that keeps its buckets contiguous and the store
will use swap_remove instructions to keep it like that. The last items of the
bucket array will be used to fill in back deleted items. The initial capacity
of the map is 0 so it will cause numerous allocation at the beginning or when
restarting from the changelog.
To avoid that, the map can also be pre allocated with an initial size with:
```sh
INITIAL_CAPACITY=5000000
```
### Auto flush
The store uses internally a [ChiralMap](https://docs.rs/chiralmap).
This means that the store will contain 2 copies of the data that will be
swapped when a flush happens. The flush can be requested with the endpoint or
configured with the autoflush variables. The values shown here are their
default:
```sh
AUTO_FLUSH_COUNT=1000
AUTO_FLUSH_SECONDS=1
```
### Reader pool
The readers of the map have a limited pool. This comes down to the nature of
mixing them with async/web runtimes. The reader themselves are lock-free and
they are not affected by the write operations but they are acquired from a
synchronized pool.
The size of the pool determines how many concurrent reads can happen.
Defaults to 1000. It can be configured as:
```sh
READER_POOL_SIZE=1000
```
### Manual assign of partitions
The store can be configured to only listen to specific partitions of the source
topic, and along with that, only manage those partitions also in the changelog.
This allows to have multiple instances of the store and split them by the list
of their partitions. The default for the store is to always listen to all
partitions, however a list of them can be configured as:
```sh
PARTITIONS=0,1,2,3...
```
### Process only mode
Enabling this mode will prevent any data to be stored in memory while the
messages are still processed into the changelog topic.
This mode is useful if a set of replica nodes are used, but don't want to have
primary nodes also be serving data or requiring high memory usage, and let them
focus only on writing data.
This mode does not work well with callbacks, as some callbacks require the
current state in memory to work (eg. updates).
Type conversion still works across topics.
```sh
USE_PROCESSING_ONLY=true
```
### Replication
The store can also hold replicas of data which are read only and for redundancy
and high availability purposes. In an example multi node scenario where there
are partitions 1,2,3 it's possible to setup 3 nodes each of which will have
these partitions as primary (node 1 > partition 1 etc) plus a copy of the other
partitions so if a node goes down or is in the middle of restarting the data
can still be queried from the replica of another node.
These copied partitions do not write into the changelog and don't consume from
the source topic, they only read from the changelog to keep a copy and keep
listening to the changelog (where the primary ones will write) to keep up to
date with the newly processed data.
```sh
# which partition to keep as an extra copy in this instance
PARTITIONS_REPLICA=4,5
# when this is true this node will be a replica only, ignoring all the primary
# partitions and source topic, only being a read only copy of the changelog
USE_REPLICA_ONLY=true
# when this is true this node will wait at bootstrap for the replica to be
# filled up from the changelog. Otherwise only the primary will block on
# startup and the replicas will be asynchronously building in the background.
# Use this to true if you need consistency of replicas immediately after boot,
# often used in nodes that are replica only.
WAIT_FOR_REPLICAS=true
```
### librdkafka
The store uses librdkafka internally from [rdkafka](https://docs.rs/rdkafka).
There are 3 instances of kafka clients: `consumer`, `producer` and `changelog`.
Each of them can be configured additionally with they key/value
[properties](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
of librdkafka.
The instances are:
- `consumer`: used to consume the input topic.
- `producer`: used to write into the changelog.
- `changelog`: used to consume the changelog at bootstrap.
Example configuration:
```sh
KAFKA_CONSUMER_PROPERTIES=receive.message.max.bytes=10000
KAFKA_PRODUCER_PROPERTIES=client.id=myclient,message.max.bytes=100000
KAFKA_CHANGELOG_PROPERTIES=max.in.flight=1000
```