use std::any::Any;
use std::collections::BTreeMap;
use tokio::sync::oneshot;
use crate::store::iq::{Iq2Query, StoreKind};
use super::request::{PartitionSel, Position, PositionBound};
use super::result::{FailureReason, QueryResult, StateQueryResult};
pub(crate) struct Iq2Request {
pub store: String,
pub kind: StoreKind,
pub query: Iq2Query,
pub partitions: PartitionSel,
pub bound: PositionBound,
pub require_active: bool,
pub reply: oneshot::Sender<Iq2Outcome>,
}
type PartitionEntry = (i32, Position, Result<Box<dyn Any + Send>, FailureReason>);
pub(crate) struct Iq2Outcome {
pub per_partition: Vec<PartitionEntry>,
#[allow(dead_code)]
pub had_tasks: bool,
}
pub(crate) fn assemble<R: 'static>(outcome: Iq2Outcome) -> StateQueryResult<R> {
let mut map: BTreeMap<i32, QueryResult<R>> = BTreeMap::new();
for (partition, position, res) in outcome.per_partition {
let qr = match res {
Ok(boxed) => match boxed.downcast::<R>() {
Ok(r) => QueryResult::Success {
result: *r,
position,
},
Err(_) => QueryResult::Failure {
reason: FailureReason::StoreException,
message: "IQv2 result type mismatch".to_string(),
},
},
Err(reason) => QueryResult::Failure {
reason,
message: format!("{reason:?}"),
},
};
map.insert(partition, qr);
}
StateQueryResult::new(map)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::iqv2::request::Position;
use crate::runtime::iqv2::result::FailureReason;
#[test]
fn assemble_downcasts_success_and_maps_failures() {
let ok: Box<dyn std::any::Any + Send> = Box::new(Some(7_i64));
let outcome = Iq2Outcome {
per_partition: vec![
(0, Position::default(), Ok(ok)),
(1, Position::default(), Err(FailureReason::NotUpToBound)),
],
had_tasks: true,
};
let res = assemble::<Option<i64>>(outcome);
assert_eq!(res.partition_results().len(), 2);
assert_eq!(res.partition_results()[&0].result(), Some(&Some(7)));
assert_eq!(
res.partition_results()[&1].failure_reason(),
Some(FailureReason::NotUpToBound)
);
}
#[test]
fn assemble_type_mismatch_is_store_exception() {
let wrong: Box<dyn std::any::Any + Send> = Box::new("not an i64".to_string());
let outcome = Iq2Outcome {
per_partition: vec![(0, Position::default(), Ok(wrong))],
had_tasks: true,
};
let res = assemble::<Option<i64>>(outcome);
assert_eq!(
res.partition_results()[&0].failure_reason(),
Some(FailureReason::StoreException)
);
}
}