pub(crate) const BATCHES_FLUSHED: &str = "buffer.batches_flushed";
pub(crate) const ENTRIES_FLUSHED: &str = "buffer.entries_flushed";
pub(crate) const BYTES_FLUSHED: &str = "buffer.bytes_flushed";
pub(crate) const BYTES_WRITTEN: &str = "buffer.bytes_written";
pub(crate) const FLUSH_DURATION_SECONDS: &str = "buffer.flush_duration_seconds";
pub(crate) const BATCHES_COLLECTED: &str = "buffer.batches_collected";
pub(crate) const ENTRIES_COLLECTED: &str = "buffer.entries_collected";
pub(crate) const BYTES_COLLECTED: &str = "buffer.bytes_collected";
pub(crate) const CONSUMER_LAG_SECONDS: &str = "buffer.consumer_lag_seconds";
pub(crate) const QUEUE_LENGTH: &str = "buffer.queue_length";
pub(crate) const ACKS: &str = "buffer.acks";
pub(crate) const GC_FILES_DELETED: &str = "buffer.gc_files_deleted";
pub(crate) const GC_FILES_FAILED: &str = "buffer.gc_files_failed";
pub(crate) const GC_DURATION_SECONDS: &str = "buffer.gc_duration_seconds";
pub(crate) const FETCH_DURATION_SECONDS: &str = "buffer.fetch_duration_seconds";
pub(crate) const MANIFEST_WRITES: &str = "buffer.manifest_writes";
pub(crate) const MANIFEST_CONFLICTS: &str = "buffer.manifest_conflicts";
pub(crate) fn describe_buffer_metrics() {
metrics::describe_counter!(BATCHES_FLUSHED, "Batches written to object store");
metrics::describe_counter!(ENTRIES_FLUSHED, "Entries across all flushed batches");
metrics::describe_counter!(BYTES_FLUSHED, "Raw bytes flushed (pre-compression)");
metrics::describe_counter!(
BYTES_WRITTEN,
"Bytes written to object store (post-compression)"
);
metrics::describe_histogram!(
FLUSH_DURATION_SECONDS,
"Flush latency in seconds (write + enqueue)"
);
metrics::describe_counter!(MANIFEST_WRITES, "Producer manifest write attempts");
metrics::describe_counter!(MANIFEST_CONFLICTS, "Producer manifest CAS conflicts");
}
pub(crate) fn describe_consumer_metrics() {
metrics::describe_counter!(BATCHES_COLLECTED, "Batches fetched from object store");
metrics::describe_counter!(ENTRIES_COLLECTED, "Entries across collected batches");
metrics::describe_counter!(BYTES_COLLECTED, "Bytes read from object store");
metrics::describe_gauge!(
CONSUMER_LAG_SECONDS,
"Wall clock minus last batch ingestion time in seconds"
);
metrics::describe_gauge!(QUEUE_LENGTH, "Entries in the manifest queue");
metrics::describe_counter!(ACKS, "Batch acks processed");
metrics::describe_counter!(GC_FILES_DELETED, "Batch files cleaned by GC");
metrics::describe_counter!(GC_FILES_FAILED, "Failed GC file deletions");
metrics::describe_histogram!(GC_DURATION_SECONDS, "GC cycle wall time in seconds");
metrics::describe_histogram!(
FETCH_DURATION_SECONDS,
"Batch fetch latency from object store in seconds"
);
metrics::describe_counter!(MANIFEST_WRITES, "Consumer manifest write attempts");
metrics::describe_counter!(MANIFEST_CONFLICTS, "Consumer manifest CAS conflicts");
}