Skip to main content

buffer/
config.rs

1use std::time::Duration;
2
3use common::ObjectStoreConfig;
4use serde::{Deserialize, Serialize};
5use serde_with::{DurationMilliSeconds, serde_as};
6
7use crate::model::CompressionType;
8
9/// Configuration for a [`Producer`](crate::Producer).
10///
11/// Controls where data batches and the queue manifest are stored, how often
12/// batches are flushed, and when backpressure is applied.
13#[serde_as]
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct ProducerConfig {
16    /// Determines where and how ingest data is persisted. See [`ObjectStoreConfig`].
17    pub object_store: ObjectStoreConfig,
18
19    /// Path prefix for data batch objects in object storage.
20    ///
21    /// Defaults to `"ingest"`.
22    #[serde(default = "default_data_path_prefix")]
23    pub data_path_prefix: String,
24
25    /// Path to the queue manifest in object storage.
26    ///
27    /// Defaults to `"ingest/manifest"`.
28    #[serde(default = "default_manifest_path")]
29    pub manifest_path: String,
30
31    /// Time interval that triggers the flush of the current batch to object storage when elapsed.
32    ///
33    /// Defaults to 100 ms.
34    #[serde_as(as = "DurationMilliSeconds<u64>")]
35    #[serde(default = "default_flush_interval")]
36    pub flush_interval: Duration,
37
38    /// Batch size in bytes (entries and metadata) that triggers a flush when exceeded.
39    ///
40    /// Defaults to 64 MiB.
41    #[serde(default = "default_flush_size_bytes")]
42    pub flush_size_bytes: usize,
43
44    /// Maximum number of input entries vectors that can be buffered for the background
45    /// batch writer before backpressure is applied.
46    ///
47    /// Defaults to 1000.
48    #[serde(default = "default_max_buffered_inputs")]
49    pub max_buffered_inputs: usize,
50
51    /// Compression algorithm applied to the record block in data batches.
52    ///
53    /// Defaults to `None` (uncompressed).
54    #[serde(default)]
55    pub batch_compression: CompressionType,
56}
57
58/// Configuration for a [`Consumer`](crate::Consumer).
59///
60/// Controls where the queue manifest and data batches are read from.
61#[serde_as]
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct ConsumerConfig {
64    /// Determines where and how ingest data is read. See [`ObjectStoreConfig`].
65    pub object_store: ObjectStoreConfig,
66
67    /// Path to the queue manifest in object storage.
68    ///
69    /// Defaults to `"ingest/manifest"`.
70    #[serde(default = "default_manifest_path")]
71    pub manifest_path: String,
72
73    /// Path prefix for data batch objects in object storage.
74    ///
75    /// Must match the producer's `data_path_prefix`. Defaults to `"ingest"`.
76    #[serde(default = "default_data_path_prefix")]
77    pub data_path_prefix: String,
78
79    /// How often garbage collection runs.
80    ///
81    /// Defaults to 5 minutes.
82    #[serde_as(as = "DurationMilliSeconds<u64>")]
83    #[serde(default = "default_gc_interval")]
84    pub gc_interval: Duration,
85
86    /// Minimum age of an unreferenced batch file before it is eligible for
87    /// deletion by the garbage collector.
88    ///
89    /// Defaults to 10 minutes.
90    #[serde_as(as = "DurationMilliSeconds<u64>")]
91    #[serde(default = "default_gc_grace_period")]
92    pub gc_grace_period: Duration,
93}
94
95fn default_data_path_prefix() -> String {
96    "ingest".to_string()
97}
98
99fn default_manifest_path() -> String {
100    "ingest/manifest".to_string()
101}
102
103fn default_flush_interval() -> Duration {
104    Duration::from_millis(100)
105}
106
107fn default_flush_size_bytes() -> usize {
108    64 * 1024 * 1024
109}
110
111fn default_max_buffered_inputs() -> usize {
112    1000
113}
114fn default_gc_interval() -> Duration {
115    Duration::from_mins(5)
116}
117
118fn default_gc_grace_period() -> Duration {
119    Duration::from_mins(10)
120}