use std::sync::Arc;
use arrow_array::{ArrayRef, BinaryArray, BooleanArray, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{ArrowError, DataType, Field, Schema};
use crate::{FinalizedBucket, IssueKind, QueryHealth, ReplicaCandidate, ReplicaState};
pub fn finalized_u64_buckets_to_record_batch<K>(
buckets: &[FinalizedBucket<K, u64>],
) -> Result<RecordBatch, ArrowError>
where
K: ToString,
{
let keys = StringArray::from_iter_values(buckets.iter().map(|bucket| bucket.key.to_string()));
let starts = UInt64Array::from_iter_values(buckets.iter().map(|bucket| bucket.start));
let ends = UInt64Array::from_iter_values(buckets.iter().map(|bucket| bucket.end));
let values = UInt64Array::from_iter_values(buckets.iter().map(|bucket| bucket.value));
RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("key", DataType::Utf8, false),
Field::new("start", DataType::UInt64, false),
Field::new("end", DataType::UInt64, false),
Field::new("value", DataType::UInt64, false),
])),
vec![
Arc::new(keys) as ArrayRef,
Arc::new(starts) as ArrayRef,
Arc::new(ends) as ArrayRef,
Arc::new(values) as ArrayRef,
],
)
}
pub fn replica_candidates_to_record_batch<ReplicaId>(
candidates: &[ReplicaCandidate<ReplicaId>],
) -> Result<RecordBatch, ArrowError>
where
ReplicaId: ToString,
{
let range_starts = BinaryArray::from_iter_values(
candidates
.iter()
.map(|candidate| candidate.range.start.as_slice()),
);
let range_ends = BinaryArray::from_iter_values(
candidates
.iter()
.map(|candidate| candidate.range.end.as_slice()),
);
let replicas = StringArray::from_iter_values(
candidates
.iter()
.map(|candidate| candidate.replica.to_string()),
);
let start_times = UInt64Array::from_iter_values(
candidates
.iter()
.map(|candidate| candidate.quality.start_time),
);
let end_times = UInt64Array::from_iter_values(
candidates
.iter()
.map(|candidate| candidate.quality.end_time),
);
let points =
UInt64Array::from_iter_values(candidates.iter().map(|candidate| candidate.quality.points));
let expected_points = UInt64Array::from_iter_values(
candidates
.iter()
.map(|candidate| candidate.quality.expected_points),
);
let complete = BooleanArray::from_iter(
candidates
.iter()
.map(|candidate| Some(candidate.quality.complete)),
);
let states = StringArray::from_iter_values(
candidates
.iter()
.map(|candidate| replica_state_name(candidate.quality.state)),
);
RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("range_start", DataType::Binary, false),
Field::new("range_end", DataType::Binary, false),
Field::new("replica", DataType::Utf8, false),
Field::new("start_time", DataType::UInt64, false),
Field::new("end_time", DataType::UInt64, false),
Field::new("points", DataType::UInt64, false),
Field::new("expected_points", DataType::UInt64, false),
Field::new("complete", DataType::Boolean, false),
Field::new("state", DataType::Utf8, false),
])),
vec![
Arc::new(range_starts) as ArrayRef,
Arc::new(range_ends) as ArrayRef,
Arc::new(replicas) as ArrayRef,
Arc::new(start_times) as ArrayRef,
Arc::new(end_times) as ArrayRef,
Arc::new(points) as ArrayRef,
Arc::new(expected_points) as ArrayRef,
Arc::new(complete) as ArrayRef,
Arc::new(states) as ArrayRef,
],
)
}
pub fn query_health_to_record_batch<ChildId>(
health: &QueryHealth<ChildId>,
) -> Result<RecordBatch, ArrowError>
where
ChildId: ToString,
{
let children =
StringArray::from_iter_values(health.issues().iter().map(|issue| issue.child.to_string()));
let kinds = StringArray::from_iter_values(
health
.issues()
.iter()
.map(|issue| issue_kind_name(issue.kind)),
);
let details =
StringArray::from_iter_values(health.issues().iter().map(|issue| issue.detail.as_str()));
RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("child", DataType::Utf8, false),
Field::new("kind", DataType::Utf8, false),
Field::new("detail", DataType::Utf8, false),
])),
vec![
Arc::new(children) as ArrayRef,
Arc::new(kinds) as ArrayRef,
Arc::new(details) as ArrayRef,
],
)
}
fn replica_state_name(state: ReplicaState) -> &'static str {
match state {
ReplicaState::Available => "available",
ReplicaState::Recovering => "recovering",
ReplicaState::Unavailable => "unavailable",
}
}
fn issue_kind_name(kind: IssueKind) -> &'static str {
match kind {
IssueKind::PrunedZone => "pruned_zone",
IssueKind::TimedOut => "timed_out",
IssueKind::StaleConfig => "stale_config",
IssueKind::MissingReplica => "missing_replica",
IssueKind::IncompleteData => "incomplete_data",
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{ReplicaQuality, ReplicaResolver};
#[test]
fn finalized_buckets_export_as_record_batch() {
let buckets = vec![FinalizedBucket {
key: "series-a",
start: 0,
end: 60,
value: 42,
}];
let batch = finalized_u64_buckets_to_record_batch(&buckets).unwrap();
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.schema().field(0).name(), "key");
}
#[test]
fn replica_candidates_export_as_record_batch() {
let candidates = vec![ReplicaCandidate::new(
b"a".to_vec()..b"m".to_vec(),
"leaf-a",
ReplicaQuality::new(0, 60, 60, 60, true, ReplicaState::Available),
)];
let batch = replica_candidates_to_record_batch(&candidates).unwrap();
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 9);
assert_eq!(
ReplicaResolver::new().resolve(candidates)[0].primary,
"leaf-a"
);
}
}