ferrokinesis 0.7.0

A local AWS Kinesis mock server for testing, written in Rust
Documentation
use crate::actions::Operation;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use tokio::sync::Mutex;

const ALL_OPERATIONS: &[Operation] = &[
    Operation::AddTagsToStream,
    Operation::CreateStream,
    Operation::DecreaseStreamRetentionPeriod,
    Operation::DeleteResourcePolicy,
    Operation::DeleteStream,
    Operation::DeregisterStreamConsumer,
    Operation::DescribeAccountSettings,
    Operation::DescribeLimits,
    Operation::DescribeStream,
    Operation::DescribeStreamConsumer,
    Operation::DescribeStreamSummary,
    Operation::DisableEnhancedMonitoring,
    Operation::EnableEnhancedMonitoring,
    Operation::GetRecords,
    Operation::GetResourcePolicy,
    Operation::GetShardIterator,
    Operation::IncreaseStreamRetentionPeriod,
    Operation::ListShards,
    Operation::ListStreamConsumers,
    Operation::ListStreams,
    Operation::ListTagsForResource,
    Operation::ListTagsForStream,
    Operation::MergeShards,
    Operation::PutRecord,
    Operation::PutRecords,
    Operation::PutResourcePolicy,
    Operation::RegisterStreamConsumer,
    Operation::RemoveTagsFromStream,
    Operation::SplitShard,
    Operation::StartStreamEncryption,
    Operation::StopStreamEncryption,
    Operation::SubscribeToShard,
    Operation::TagResource,
    Operation::UntagResource,
    Operation::UpdateAccountSettings,
    Operation::UpdateMaxRecordSize,
    Operation::UpdateShardCount,
    Operation::UpdateStreamMode,
    Operation::UpdateStreamWarmThroughput,
];

const ALL_PRE_OPERATION_FAILURE_REASONS: &[PreOperationFailureReason] = &[
    PreOperationFailureReason::AccessDenied,
    PreOperationFailureReason::IncompleteSignature,
    PreOperationFailureReason::InvalidSignature,
    PreOperationFailureReason::MissingAuthToken,
    PreOperationFailureReason::SerializationException,
    PreOperationFailureReason::UnknownOperation,
];

#[derive(Debug, Clone, Copy)]
pub enum PreOperationFailureReason {
    AccessDenied,
    IncompleteSignature,
    InvalidSignature,
    MissingAuthToken,
    SerializationException,
    UnknownOperation,
}

#[derive(Debug)]
pub struct AppMetrics {
    retained_bytes: AtomicU64,
    retained_records: AtomicU64,
    rejected_writes_total: AtomicU64,
    mirror_dropped_total: AtomicU64,
    replay_complete: AtomicBool,
    last_snapshot_ms: AtomicU64,
    last_request_duration_micros: AtomicU64,
    current_streams: AtomicU64,
    current_shards: AtomicU64,
    request_counters: Vec<OperationMetrics>,
    pre_operation_failure_counters: Vec<AtomicU64>,
    active_iterators: Mutex<VecDeque<u64>>,
    iterator_ttl_ms: u64,
}

fn operation_name(operation: &Operation) -> &'static str {
    match operation {
        Operation::AddTagsToStream => "AddTagsToStream",
        Operation::CreateStream => "CreateStream",
        Operation::DecreaseStreamRetentionPeriod => "DecreaseStreamRetentionPeriod",
        Operation::DeleteResourcePolicy => "DeleteResourcePolicy",
        Operation::DeleteStream => "DeleteStream",
        Operation::DeregisterStreamConsumer => "DeregisterStreamConsumer",
        Operation::DescribeAccountSettings => "DescribeAccountSettings",
        Operation::DescribeLimits => "DescribeLimits",
        Operation::DescribeStream => "DescribeStream",
        Operation::DescribeStreamConsumer => "DescribeStreamConsumer",
        Operation::DescribeStreamSummary => "DescribeStreamSummary",
        Operation::DisableEnhancedMonitoring => "DisableEnhancedMonitoring",
        Operation::EnableEnhancedMonitoring => "EnableEnhancedMonitoring",
        Operation::GetRecords => "GetRecords",
        Operation::GetResourcePolicy => "GetResourcePolicy",
        Operation::GetShardIterator => "GetShardIterator",
        Operation::IncreaseStreamRetentionPeriod => "IncreaseStreamRetentionPeriod",
        Operation::ListShards => "ListShards",
        Operation::ListStreamConsumers => "ListStreamConsumers",
        Operation::ListStreams => "ListStreams",
        Operation::ListTagsForResource => "ListTagsForResource",
        Operation::ListTagsForStream => "ListTagsForStream",
        Operation::MergeShards => "MergeShards",
        Operation::PutRecord => "PutRecord",
        Operation::PutRecords => "PutRecords",
        Operation::PutResourcePolicy => "PutResourcePolicy",
        Operation::RegisterStreamConsumer => "RegisterStreamConsumer",
        Operation::RemoveTagsFromStream => "RemoveTagsFromStream",
        Operation::SplitShard => "SplitShard",
        Operation::StartStreamEncryption => "StartStreamEncryption",
        Operation::StopStreamEncryption => "StopStreamEncryption",
        Operation::SubscribeToShard => "SubscribeToShard",
        Operation::TagResource => "TagResource",
        Operation::UntagResource => "UntagResource",
        Operation::UpdateAccountSettings => "UpdateAccountSettings",
        Operation::UpdateMaxRecordSize => "UpdateMaxRecordSize",
        Operation::UpdateShardCount => "UpdateShardCount",
        Operation::UpdateStreamMode => "UpdateStreamMode",
        Operation::UpdateStreamWarmThroughput => "UpdateStreamWarmThroughput",
    }
}

fn pre_operation_failure_reason_name(reason: PreOperationFailureReason) -> &'static str {
    match reason {
        PreOperationFailureReason::AccessDenied => "access_denied",
        PreOperationFailureReason::IncompleteSignature => "incomplete_signature",
        PreOperationFailureReason::InvalidSignature => "invalid_signature",
        PreOperationFailureReason::MissingAuthToken => "missing_auth_token",
        PreOperationFailureReason::SerializationException => "serialization_exception",
        PreOperationFailureReason::UnknownOperation => "unknown_operation",
    }
}

#[derive(Debug)]
struct OperationMetrics {
    ok_total: AtomicU64,
    error_total: AtomicU64,
    duration_count: AtomicU64,
    duration_micros_sum: AtomicU64,
}

impl Default for OperationMetrics {
    fn default() -> Self {
        Self {
            ok_total: AtomicU64::new(0),
            error_total: AtomicU64::new(0),
            duration_count: AtomicU64::new(0),
            duration_micros_sum: AtomicU64::new(0),
        }
    }
}

impl AppMetrics {
    pub fn new(iterator_ttl_seconds: u64) -> Arc<Self> {
        let request_counters = std::iter::repeat_with(OperationMetrics::default)
            .take(ALL_OPERATIONS.len())
            .collect();
        Arc::new(Self {
            retained_bytes: AtomicU64::new(0),
            retained_records: AtomicU64::new(0),
            rejected_writes_total: AtomicU64::new(0),
            mirror_dropped_total: AtomicU64::new(0),
            replay_complete: AtomicBool::new(true),
            last_snapshot_ms: AtomicU64::new(0),
            last_request_duration_micros: AtomicU64::new(0),
            current_streams: AtomicU64::new(0),
            current_shards: AtomicU64::new(0),
            request_counters,
            pre_operation_failure_counters: std::iter::repeat_with(|| AtomicU64::new(0))
                .take(ALL_PRE_OPERATION_FAILURE_REASONS.len())
                .collect(),
            active_iterators: Mutex::new(VecDeque::new()),
            iterator_ttl_ms: iterator_ttl_seconds.saturating_mul(1000),
        })
    }

    pub fn set_retained(&self, bytes: u64, records: u64) {
        self.retained_bytes.store(bytes, Ordering::Relaxed);
        self.retained_records.store(records, Ordering::Relaxed);
    }

    pub fn add_retained(&self, bytes: u64, records: u64) {
        self.retained_bytes.fetch_add(bytes, Ordering::Relaxed);
        self.retained_records.fetch_add(records, Ordering::Relaxed);
    }

    pub fn remove_retained(&self, bytes: u64, records: u64) {
        let _ = self
            .retained_bytes
            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
                Some(current.saturating_sub(bytes))
            });
        let _ =
            self.retained_records
                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
                    Some(current.saturating_sub(records))
                });
    }

    pub fn retained_bytes(&self) -> u64 {
        self.retained_bytes.load(Ordering::Relaxed)
    }

    pub fn retained_records(&self) -> u64 {
        self.retained_records.load(Ordering::Relaxed)
    }

    pub fn increment_rejected_writes(&self) {
        self.rejected_writes_total.fetch_add(1, Ordering::Relaxed);
    }

    pub fn increment_mirror_dropped(&self) {
        self.mirror_dropped_total.fetch_add(1, Ordering::Relaxed);
    }

    pub fn set_replay_complete(&self, value: bool) {
        self.replay_complete.store(value, Ordering::Relaxed);
    }

    pub fn set_last_snapshot_ms(&self, value: u64) {
        self.last_snapshot_ms.store(value, Ordering::Relaxed);
    }

    pub fn set_topology(&self, streams: u64, shards: u64) {
        self.current_streams.store(streams, Ordering::Relaxed);
        self.current_shards.store(shards, Ordering::Relaxed);
    }

    pub fn record_request(&self, operation: Operation, ok: bool, duration_micros: u64) {
        let op = &self.request_counters[operation as usize];
        if ok {
            op.ok_total.fetch_add(1, Ordering::Relaxed);
        } else {
            op.error_total.fetch_add(1, Ordering::Relaxed);
        }
        op.duration_count.fetch_add(1, Ordering::Relaxed);
        op.duration_micros_sum
            .fetch_add(duration_micros, Ordering::Relaxed);
        self.last_request_duration_micros
            .store(duration_micros, Ordering::Relaxed);
    }

    pub fn record_pre_operation_failure(
        &self,
        reason: PreOperationFailureReason,
        duration_micros: u64,
    ) {
        self.pre_operation_failure_counters[reason as usize].fetch_add(1, Ordering::Relaxed);
        self.last_request_duration_micros
            .store(duration_micros, Ordering::Relaxed);
    }

    pub async fn record_iterator(&self, now_ms: u64) {
        let mut active = self.active_iterators.lock().await;
        active.push_back(now_ms);
        self.prune_iterators_locked(&mut active, now_ms);
    }

    pub async fn active_iterator_count(&self, now_ms: u64) -> usize {
        let mut active = self.active_iterators.lock().await;
        self.prune_iterators_locked(&mut active, now_ms);
        active.len()
    }

    pub async fn render(&self, now_ms: u64) -> String {
        let active_iterators = self.active_iterator_count(now_ms).await;
        let mut out = String::new();
        out.push_str("# TYPE ferrokinesis_retained_bytes gauge\n");
        out.push_str(&format!(
            "ferrokinesis_retained_bytes {}\n",
            self.retained_bytes()
        ));
        out.push_str("# TYPE ferrokinesis_retained_records gauge\n");
        out.push_str(&format!(
            "ferrokinesis_retained_records {}\n",
            self.retained_records()
        ));
        out.push_str("# TYPE ferrokinesis_rejected_writes_total counter\n");
        out.push_str(&format!(
            "ferrokinesis_rejected_writes_total {}\n",
            self.rejected_writes_total.load(Ordering::Relaxed)
        ));
        out.push_str("# TYPE ferrokinesis_mirror_dropped_total counter\n");
        out.push_str(&format!(
            "ferrokinesis_mirror_dropped_total {}\n",
            self.mirror_dropped_total.load(Ordering::Relaxed)
        ));
        out.push_str("# TYPE ferrokinesis_replay_complete gauge\n");
        out.push_str(&format!(
            "ferrokinesis_replay_complete {}\n",
            u8::from(self.replay_complete.load(Ordering::Relaxed))
        ));
        out.push_str("# TYPE ferrokinesis_last_snapshot_timestamp_ms gauge\n");
        out.push_str(&format!(
            "ferrokinesis_last_snapshot_timestamp_ms {}\n",
            self.last_snapshot_ms.load(Ordering::Relaxed)
        ));
        out.push_str("# TYPE ferrokinesis_streams gauge\n");
        out.push_str(&format!(
            "ferrokinesis_streams {}\n",
            self.current_streams.load(Ordering::Relaxed)
        ));
        out.push_str("# TYPE ferrokinesis_open_shards gauge\n");
        out.push_str(&format!(
            "ferrokinesis_open_shards {}\n",
            self.current_shards.load(Ordering::Relaxed)
        ));
        out.push_str("# TYPE ferrokinesis_active_iterators gauge\n");
        out.push_str(&format!(
            "ferrokinesis_active_iterators {}\n",
            active_iterators
        ));
        out.push_str("# TYPE ferrokinesis_request_failures_total counter\n");
        for (index, reason) in ALL_PRE_OPERATION_FAILURE_REASONS.iter().enumerate() {
            out.push_str(&format!(
                "ferrokinesis_request_failures_total{{reason=\"{}\"}} {}\n",
                pre_operation_failure_reason_name(*reason),
                self.pre_operation_failure_counters[index].load(Ordering::Relaxed)
            ));
        }

        for (index, operation) in ALL_OPERATIONS.iter().enumerate() {
            let metrics = &self.request_counters[index];
            let op = operation_name(operation);
            out.push_str(&format!(
                "ferrokinesis_requests_total{{operation=\"{op}\",result=\"ok\"}} {}\n",
                metrics.ok_total.load(Ordering::Relaxed)
            ));
            out.push_str(&format!(
                "ferrokinesis_requests_total{{operation=\"{op}\",result=\"error\"}} {}\n",
                metrics.error_total.load(Ordering::Relaxed)
            ));
            out.push_str(&format!(
                "ferrokinesis_request_duration_seconds_count{{operation=\"{op}\"}} {}\n",
                metrics.duration_count.load(Ordering::Relaxed)
            ));
            out.push_str(&format!(
                "ferrokinesis_request_duration_seconds_sum{{operation=\"{op}\"}} {}\n",
                metrics.duration_micros_sum.load(Ordering::Relaxed) as f64 / 1_000_000.0
            ));
        }

        out
    }

    fn prune_iterators_locked(&self, active: &mut VecDeque<u64>, now_ms: u64) {
        while let Some(created_at) = active.front().copied() {
            if now_ms.saturating_sub(created_at) <= self.iterator_ttl_ms {
                break;
            }
            active.pop_front();
        }
    }
}