Skip to main content

scouter_settings/
storage.rs

1use crate::ScouterServerConfig;
2use scouter_types::StorageType;
3use serde::Serialize;
4use std::path::PathBuf;
5
6#[derive(Debug, Clone, Serialize)]
7pub struct ObjectStorageSettings {
8    pub storage_uri: String,
9    pub storage_type: StorageType,
10    pub region: String, // this is aws specific
11    /// How often the Delta Lake compaction (Z-ORDER optimize) runs for trace tables. Default: 24h.
12    pub trace_compaction_interval_hours: u64,
13    /// How often the span buffer flushes to Delta Lake. Default: 5s.
14    pub trace_flush_interval_secs: u64,
15    /// How often each pod refreshes the Delta table snapshot from shared storage. Default: 10s.
16    ///
17    /// In multi-pod deployments (e.g. K8s with a dedicated writer pod and reader pods),
18    /// the reader pods' in-memory Delta log snapshot becomes stale after the writer commits
19    /// new data. This ticker calls `update_incremental()` on the DeltaTable and re-registers
20    /// the SessionContext so queries return fresh results without a pod restart.
21    pub trace_refresh_interval_secs: u64,
22}
23
24impl Default for ObjectStorageSettings {
25    fn default() -> Self {
26        let storage_uri = std::env::var("SCOUTER_STORAGE_URI")
27            .unwrap_or_else(|_| "./scouter_storage".to_string());
28
29        let storage_type = ScouterServerConfig::get_storage_type(&storage_uri);
30
31        // need to set this for aws objectstore
32        let region = std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string());
33
34        let trace_compaction_interval_hours =
35            std::env::var("SCOUTER_TRACE_COMPACTION_INTERVAL_HOURS")
36                .ok()
37                .and_then(|v| v.parse().ok())
38                .unwrap_or(24u64);
39
40        let trace_flush_interval_secs = std::env::var("SCOUTER_TRACE_FLUSH_INTERVAL_SECS")
41            .ok()
42            .and_then(|v| v.parse().ok())
43            .unwrap_or(5u64);
44
45        let trace_refresh_interval_secs = std::env::var("SCOUTER_TRACE_REFRESH_INTERVAL_SECS")
46            .ok()
47            .and_then(|v| v.parse().ok())
48            .unwrap_or(10u64);
49
50        Self {
51            storage_uri,
52            storage_type,
53            region,
54            trace_compaction_interval_hours,
55            trace_flush_interval_secs,
56            trace_refresh_interval_secs,
57        }
58    }
59}
60
61impl ObjectStorageSettings {
62    /// Buffer size for trace span batching before flushing to Delta Lake.
63    ///
64    /// Configurable via `SCOUTER_TRACE_BUFFER_SIZE`. Larger values produce fewer,
65    /// bigger Parquet files — reducing Delta log replay cost and file-open overhead
66    /// on cloud storage at the expense of slightly longer flush intervals.
67    /// Default: 10,000 spans.
68    pub fn trace_buffer_size(&self) -> usize {
69        std::env::var("SCOUTER_TRACE_BUFFER_SIZE")
70            .ok()
71            .and_then(|v| v.parse().ok())
72            .unwrap_or(10_000)
73    }
74
75    /// Maximum size (in MB) of the object store range cache used to avoid
76    /// redundant cloud round-trips for immutable Parquet footer reads.
77    ///
78    /// Configurable via `SCOUTER_OBJECT_CACHE_MB`. Default: 64 MB.
79    pub fn object_cache_mb(&self) -> u64 {
80        std::env::var("SCOUTER_OBJECT_CACHE_MB")
81            .ok()
82            .and_then(|v| v.parse().ok())
83            .unwrap_or(64)
84    }
85
86    pub fn storage_root(&self) -> String {
87        match self.storage_type {
88            StorageType::Google | StorageType::Aws | StorageType::Azure => {
89                if let Some(stripped) = self.storage_uri.strip_prefix("gs://") {
90                    stripped.split('/').next().unwrap_or(stripped).to_string()
91                } else if let Some(stripped) = self.storage_uri.strip_prefix("s3://") {
92                    stripped.split('/').next().unwrap_or(stripped).to_string()
93                } else if let Some(stripped) = self.storage_uri.strip_prefix("az://") {
94                    stripped.split('/').next().unwrap_or(stripped).to_string()
95                } else {
96                    self.storage_uri.clone()
97                }
98            }
99            StorageType::Local => {
100                // For local storage, just return the path directly
101                self.storage_uri.clone()
102            }
103        }
104    }
105
106    pub fn canonicalized_path(&self) -> String {
107        // if registry is local canonicalize the path
108        if self.storage_type == StorageType::Local {
109            let path = PathBuf::from(&self.storage_uri);
110            if path.exists() {
111                path.canonicalize()
112                    .unwrap_or_else(|_| path.clone())
113                    .to_str()
114                    .unwrap()
115                    .to_string()
116            } else {
117                self.storage_uri.clone()
118            }
119        } else {
120            self.storage_uri.clone()
121        }
122    }
123}