use std::collections::{BTreeMap, BTreeSet};
use super::query::Query;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct Position(pub BTreeMap<String, BTreeMap<i32, i64>>);
impl Position {
#[must_use]
pub fn offset(&self, topic: &str, partition: i32) -> Option<i64> {
self.0.get(topic).and_then(|m| m.get(&partition)).copied()
}
#[must_use]
pub(crate) fn dominates(&self, bound: &Position) -> bool {
bound.0.iter().all(|(topic, parts)| {
parts
.iter()
.all(|(p, off)| self.offset(topic, *p).is_some_and(|cur| cur >= *off))
})
}
}
#[derive(Debug, Clone, Default)]
pub enum PositionBound {
#[default]
Unbounded,
At(Position),
}
#[derive(Debug, Clone, Default)]
pub(crate) enum PartitionSel {
#[default]
All,
Set(BTreeSet<i32>),
}
pub struct StateQuery<Q: Query> {
pub(crate) store: String,
pub(crate) query: Q,
pub(crate) partitions: PartitionSel,
pub(crate) bound: PositionBound,
pub(crate) require_active: bool,
}
impl<Q: Query> StateQuery<Q> {
#[must_use]
pub fn with_partitions(mut self, set: BTreeSet<i32>) -> Self {
self.partitions = PartitionSel::Set(set);
self
}
#[must_use]
pub fn with_all_partitions(mut self) -> Self {
self.partitions = PartitionSel::All;
self
}
#[must_use]
pub fn with_position_bound(mut self, bound: PositionBound) -> Self {
self.bound = bound;
self
}
#[must_use]
pub fn require_active(mut self) -> Self {
self.require_active = true;
self
}
}
pub struct StateQueryRequest;
impl StateQueryRequest {
#[must_use]
pub fn in_store(name: impl Into<String>) -> StateQueryRequestBuilder {
StateQueryRequestBuilder { store: name.into() }
}
}
pub struct StateQueryRequestBuilder {
store: String,
}
impl StateQueryRequestBuilder {
#[must_use]
pub fn with_query<Q: Query>(self, query: Q) -> StateQuery<Q> {
StateQuery {
store: self.store,
query,
partitions: PartitionSel::All,
bound: PositionBound::Unbounded,
require_active: false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn pos(entries: &[(&str, i32, i64)]) -> Position {
let mut m: BTreeMap<String, BTreeMap<i32, i64>> = BTreeMap::new();
for (t, p, o) in entries {
m.entry((*t).to_string()).or_default().insert(*p, *o);
}
Position(m)
}
#[test]
fn dominates_requires_all_bound_partitions_met() {
let cur = pos(&[("in", 0, 10), ("in", 1, 5)]);
assert!(cur.dominates(&pos(&[("in", 0, 10)])));
assert!(cur.dominates(&pos(&[("in", 0, 9), ("in", 1, 5)])));
assert!(!cur.dominates(&pos(&[("in", 0, 11)]))); assert!(!cur.dominates(&pos(&[("other", 0, 1)]))); }
#[test]
fn position_offset_reads_present_and_absent_tps() {
let p = pos(&[("in", 0, 10), ("in", 1, 5)]);
assert_eq!(p.offset("in", 0), Some(10));
assert_eq!(p.offset("in", 1), Some(5));
assert_eq!(p.offset("in", 2), None); assert_eq!(p.offset("other", 0), None); }
#[test]
fn builder_defaults_and_store_name() {
use crate::runtime::iqv2::query::KeyQuery;
let q = StateQueryRequest::in_store("s")
.with_query(KeyQuery::<String, i64>::with_key("k".into()));
assert_eq!(q.store, "s");
assert!(matches!(q.partitions, PartitionSel::All));
assert!(matches!(q.bound, PositionBound::Unbounded));
assert!(!q.require_active);
}
#[test]
fn builder_chain_sets_each_field() {
use crate::runtime::iqv2::query::KeyQuery;
let set: BTreeSet<i32> = [0, 2].into_iter().collect();
let q = StateQueryRequest::in_store("s")
.with_query(KeyQuery::<String, i64>::with_key("k".into()))
.with_partitions(set.clone());
match &q.partitions {
PartitionSel::Set(s) => assert_eq!(s, &set),
PartitionSel::All => panic!("expected explicit partition set"),
}
let q = q.with_all_partitions();
assert!(matches!(q.partitions, PartitionSel::All));
let bound_pos = pos(&[("in", 0, 7)]);
let q = q.with_position_bound(PositionBound::At(bound_pos.clone()));
match &q.bound {
PositionBound::At(p) => assert_eq!(p, &bound_pos),
PositionBound::Unbounded => panic!("expected At bound"),
}
let q = q.require_active();
assert!(q.require_active);
}
}