scepter 0.1.5

Composable primitives for planet-scale time-series routing, indexing, and aggregation.
Documentation
use std::sync::Arc;

use arrow_array::{ArrayRef, BinaryArray, BooleanArray, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{ArrowError, DataType, Field, Schema};

use crate::{FinalizedBucket, IssueKind, QueryHealth, ReplicaCandidate, ReplicaState};

/// Converts finalized `u64` buckets into an Arrow record batch.
pub fn finalized_u64_buckets_to_record_batch<K>(
    buckets: &[FinalizedBucket<K, u64>],
) -> Result<RecordBatch, ArrowError>
where
    K: ToString,
{
    let keys = StringArray::from_iter_values(buckets.iter().map(|bucket| bucket.key.to_string()));
    let starts = UInt64Array::from_iter_values(buckets.iter().map(|bucket| bucket.start));
    let ends = UInt64Array::from_iter_values(buckets.iter().map(|bucket| bucket.end));
    let values = UInt64Array::from_iter_values(buckets.iter().map(|bucket| bucket.value));

    RecordBatch::try_new(
        Arc::new(Schema::new(vec![
            Field::new("key", DataType::Utf8, false),
            Field::new("start", DataType::UInt64, false),
            Field::new("end", DataType::UInt64, false),
            Field::new("value", DataType::UInt64, false),
        ])),
        vec![
            Arc::new(keys) as ArrayRef,
            Arc::new(starts) as ArrayRef,
            Arc::new(ends) as ArrayRef,
            Arc::new(values) as ArrayRef,
        ],
    )
}

/// Converts replica candidates into an Arrow record batch.
pub fn replica_candidates_to_record_batch<ReplicaId>(
    candidates: &[ReplicaCandidate<ReplicaId>],
) -> Result<RecordBatch, ArrowError>
where
    ReplicaId: ToString,
{
    let range_starts = BinaryArray::from_iter_values(
        candidates
            .iter()
            .map(|candidate| candidate.range.start.as_slice()),
    );
    let range_ends = BinaryArray::from_iter_values(
        candidates
            .iter()
            .map(|candidate| candidate.range.end.as_slice()),
    );
    let replicas = StringArray::from_iter_values(
        candidates
            .iter()
            .map(|candidate| candidate.replica.to_string()),
    );
    let start_times = UInt64Array::from_iter_values(
        candidates
            .iter()
            .map(|candidate| candidate.quality.start_time),
    );
    let end_times = UInt64Array::from_iter_values(
        candidates
            .iter()
            .map(|candidate| candidate.quality.end_time),
    );
    let points =
        UInt64Array::from_iter_values(candidates.iter().map(|candidate| candidate.quality.points));
    let expected_points = UInt64Array::from_iter_values(
        candidates
            .iter()
            .map(|candidate| candidate.quality.expected_points),
    );
    let complete = BooleanArray::from_iter(
        candidates
            .iter()
            .map(|candidate| Some(candidate.quality.complete)),
    );
    let states = StringArray::from_iter_values(
        candidates
            .iter()
            .map(|candidate| replica_state_name(candidate.quality.state)),
    );

    RecordBatch::try_new(
        Arc::new(Schema::new(vec![
            Field::new("range_start", DataType::Binary, false),
            Field::new("range_end", DataType::Binary, false),
            Field::new("replica", DataType::Utf8, false),
            Field::new("start_time", DataType::UInt64, false),
            Field::new("end_time", DataType::UInt64, false),
            Field::new("points", DataType::UInt64, false),
            Field::new("expected_points", DataType::UInt64, false),
            Field::new("complete", DataType::Boolean, false),
            Field::new("state", DataType::Utf8, false),
        ])),
        vec![
            Arc::new(range_starts) as ArrayRef,
            Arc::new(range_ends) as ArrayRef,
            Arc::new(replicas) as ArrayRef,
            Arc::new(start_times) as ArrayRef,
            Arc::new(end_times) as ArrayRef,
            Arc::new(points) as ArrayRef,
            Arc::new(expected_points) as ArrayRef,
            Arc::new(complete) as ArrayRef,
            Arc::new(states) as ArrayRef,
        ],
    )
}

/// Converts query health issues into an Arrow record batch.
pub fn query_health_to_record_batch<ChildId>(
    health: &QueryHealth<ChildId>,
) -> Result<RecordBatch, ArrowError>
where
    ChildId: ToString,
{
    let children =
        StringArray::from_iter_values(health.issues().iter().map(|issue| issue.child.to_string()));
    let kinds = StringArray::from_iter_values(
        health
            .issues()
            .iter()
            .map(|issue| issue_kind_name(issue.kind)),
    );
    let details =
        StringArray::from_iter_values(health.issues().iter().map(|issue| issue.detail.as_str()));

    RecordBatch::try_new(
        Arc::new(Schema::new(vec![
            Field::new("child", DataType::Utf8, false),
            Field::new("kind", DataType::Utf8, false),
            Field::new("detail", DataType::Utf8, false),
        ])),
        vec![
            Arc::new(children) as ArrayRef,
            Arc::new(kinds) as ArrayRef,
            Arc::new(details) as ArrayRef,
        ],
    )
}

fn replica_state_name(state: ReplicaState) -> &'static str {
    match state {
        ReplicaState::Available => "available",
        ReplicaState::Recovering => "recovering",
        ReplicaState::Unavailable => "unavailable",
    }
}

fn issue_kind_name(kind: IssueKind) -> &'static str {
    match kind {
        IssueKind::PrunedZone => "pruned_zone",
        IssueKind::TimedOut => "timed_out",
        IssueKind::StaleConfig => "stale_config",
        IssueKind::MissingReplica => "missing_replica",
        IssueKind::IncompleteData => "incomplete_data",
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{ReplicaQuality, ReplicaResolver};

    #[test]
    fn finalized_buckets_export_as_record_batch() {
        let buckets = vec![FinalizedBucket {
            key: "series-a",
            start: 0,
            end: 60,
            value: 42,
        }];

        let batch = finalized_u64_buckets_to_record_batch(&buckets).unwrap();

        assert_eq!(batch.num_rows(), 1);
        assert_eq!(batch.schema().field(0).name(), "key");
    }

    #[test]
    fn replica_candidates_export_as_record_batch() {
        let candidates = vec![ReplicaCandidate::new(
            b"a".to_vec()..b"m".to_vec(),
            "leaf-a",
            ReplicaQuality::new(0, 60, 60, 60, true, ReplicaState::Available),
        )];

        let batch = replica_candidates_to_record_batch(&candidates).unwrap();

        assert_eq!(batch.num_rows(), 1);
        assert_eq!(batch.num_columns(), 9);
        assert_eq!(
            ReplicaResolver::new().resolve(candidates)[0].primary,
            "leaf-a"
        );
    }
}